Hi Ryan,

I am using mapWithState after doing reduceByKey.

I am right now using mapWithState as you suggested and triggering the count
manually.

But, still unable to see any checkpointing taking place. In the DAG I can
see that the reduceByKey operation for the previous batches are also being
computed.


Thanks
Abhi


On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Hey Abhi,
>
> Using reducebykeyandwindow and mapWithState will trigger the bug
> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>
>     JavaMapWithStateDStream<...> stateDStream =
> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>     stateDStream.foreachRDD(new Function1<...>() {
>       @Override
>       public Void call(JavaRDD<...> rdd) throws Exception {
>         rdd.count();
>       }
>     });
>     return stateDStream.stateSnapshots();
>
>
> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Hi Ryan,
>>
>> Reposting the code.
>>
>> Basically my use case is something like - I am receiving the web
>> impression logs and may get the notify (listening from kafka) for those
>> impressions in the same interval (for me its 1 min) or any next interval
>> (upto 2 hours). Now, when I receive notify for a particular impression I
>> need to swap the date field in impression with the date field in notify
>> logs. The notify for an impression has the same key as impression.
>>
>> static Function3<String, Optional<MyClass>, State<MyClass>,
>> Tuple2<String, MyClass>> mappingFunc =
>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>> MyClass>>() {
>> @Override
>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>> State<MyClass> state) {
>> MyClass nullObj = new MyClass();
>> nullObj.setImprLog(null);
>> nullObj.setNotifyLog(null);
>> MyClass current = one.or(nullObj);
>>
>> if(current!= null && current.getImprLog() != null &&
>> current.getMyClassType() == 1 /*this is impression*/){
>> return new Tuple2<>(key, null);
>> }
>> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
>> /*notify for the impression received*/){
>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>> if(oldState!= null && oldState.getNotifyLog() != null){
>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>  //swappping the dates
>> return new Tuple2<>(key, oldState);
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>>
>> }
>> };
>>
>>
>> return
>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>
>>
>> Currently I am using reducebykeyandwindow without the inverse function
>> and I am able to get the correct data. But, issue the might arise is when I
>> have to restart my application from checkpoint and it repartitions and
>> computes the previous 120 partitions, which delays the incoming batches.
>>
>>
>> Thanks !!
>> Abhi
>>
>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey Abhi,
>>>
>>> Could you post how you use mapWithState? By default, it should do
>>> checkpointing every 10 batches.
>>> However, there is a known issue that prevents mapWithState from
>>> checkpointing in some special cases:
>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>
>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com
>>> > wrote:
>>>
>>>> Any Insights on this one ?
>>>>
>>>>
>>>> Thanks !!!
>>>> Abhi
>>>>
>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>>> abhis.anan...@gmail.com> wrote:
>>>>
>>>>> I am now trying to use mapWithState in the following way using some
>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>>> the state and when restarting the application from checkpoint, it
>>>>> re-partitions all the previous batches data from kafka.
>>>>>
>>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>> new Function3<String, Optional<MyClass>, State<MyClass>,
>>>>> Tuple2<String, MyClass>>() {
>>>>> @Override
>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>>>>> State<MyClass> state) {
>>>>> MyClass nullObj = new MyClass();
>>>>> nullObj.setImprLog(null);
>>>>> nullObj.setNotifyLog(null);
>>>>> MyClass current = one.or(nullObj);
>>>>>
>>>>> if(current!= null && current.getImprLog() != null &&
>>>>> current.getMyClassType() == 1){
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>> else if (current.getNotifyLog() != null  && current.getMyClassType()
>>>>> == 3){
>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>> return new Tuple2<>(key, oldState);
>>>>> }
>>>>> else{
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>> }
>>>>> else{
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>>
>>>>> }
>>>>> };
>>>>>
>>>>>
>>>>> Please suggest if this is the proper way or am I doing something wrong.
>>>>>
>>>>>
>>>>> Thanks !!
>>>>> Abhi
>>>>>
>>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <
>>>>> sebastian....@gmail.com> wrote:
>>>>>
>>>>>> If you don't want to update your only option will be updateStateByKey
>>>>>> then
>>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> mapWithState supports checkpoint.
>>>>>>>
>>>>>>> There has been some bug fix since release of 1.6.0
>>>>>>> e.g.
>>>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>>>> with KryoSerializer
>>>>>>>
>>>>>>> which is in the upcoming 1.6.1
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>>>> abhis.anan...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Does mapWithState checkpoints the data ?
>>>>>>>>
>>>>>>>> When my application goes down and is restarted from checkpoint,
>>>>>>>> will mapWithState need to recompute the previous batches data ?
>>>>>>>>
>>>>>>>> Also, to use mapWithState I will need to upgrade my application as
>>>>>>>> I am using version 1.4.0 and mapWithState isnt supported there. Is 
>>>>>>>> there
>>>>>>>> any other work around ?
>>>>>>>>
>>>>>>>> Cheers!!
>>>>>>>> Abhi
>>>>>>>>
>>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>>>> sebastian....@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Looks like mapWithState could help you?
>>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <
>>>>>>>>> abhis.anan...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I have an use case like follows in my production environment
>>>>>>>>>> where I am listening from kafka with slideInterval of 1 min and
>>>>>>>>>> windowLength of 2 hours.
>>>>>>>>>>
>>>>>>>>>> I have a JavaPairDStream where for each key I am getting the same
>>>>>>>>>> key but with different value,which might appear in the same batch or 
>>>>>>>>>> some
>>>>>>>>>> next batch.
>>>>>>>>>>
>>>>>>>>>> When the key appears second time I need to update a field in
>>>>>>>>>> value of previous key with a field in the later key. The keys for 
>>>>>>>>>> which the
>>>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>>>>
>>>>>>>>>> At the end of each second I need to output the result to external
>>>>>>>>>> database.
>>>>>>>>>>
>>>>>>>>>> For example :
>>>>>>>>>>
>>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>>>> At t=1sec I am getting
>>>>>>>>>> key0,value0(0,"prev0")
>>>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>>>
>>>>>>>>>> Output to database after 1 sec
>>>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>>>
>>>>>>>>>> At t=2 sec getting
>>>>>>>>>> key3,value4(4,"prev3")
>>>>>>>>>> key1,value5(5,"next1")
>>>>>>>>>>
>>>>>>>>>> Output to database after 2 sec
>>>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>>>
>>>>>>>>>> At t=3 sec
>>>>>>>>>> key4,value6(6,"prev4")
>>>>>>>>>> key3,value7(7,"next3")
>>>>>>>>>> key5,value5(8,"prev5")
>>>>>>>>>> key5,value5(9,"next5")
>>>>>>>>>> key0,value0(10,"next0")
>>>>>>>>>>
>>>>>>>>>> Output to database after 3 sec
>>>>>>>>>> key0,newValue(0,"next0")
>>>>>>>>>> key3,newValue(4,"next3")
>>>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks a lot !!!!
>>>>>>>>>> Abhi
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to