Never mind I understand what is going on Aljoscha for each unique key the
value count is reset to 0.

On Wed, Mar 23, 2016 at 4:37 PM, Balaji Rajagopalan <
balaji.rajagopa...@olacabs.com> wrote:

> (Booking(te7uc4,compact,j...@gmail.com,Mon Feb 29 19:19:40 IST
> 2016),1458730980000,1458731040000)current booking count 1
> (Booking(tdr1ym,compact,er...@gmail.com,Mon Feb 29 18:41:07 IST
> 2016),1458730980000,1458731040000)current booking count 1
> (Booking(t9zvqw,compact,yas...@gmail.com,Mon Feb 29 19:19:40 IST
> 2016),1458730980000,1458731040000)current booking count 1
> (Booking(tdr1e8,compact,k....@gmail.com,Mon Feb 29 18:41:07 IST
> 2016),1458730980000,1458731040000)current booking count 1
> (Booking(tdntcj,compact,e...@gmail.com,Mon Feb 29 19:19:40 IST
> 2016),1458730980000,1458731040000)current booking count 1
> (Booking(tdr1wv,compact,e...@gmail.com,Mon Feb 29 18:41:07 IST
> 2016),1458730980000,1458731040000)current booking count 1
> (Booking(tdr1wv,compact,er...@yahoo.in,Mon Feb 29 18:41:07 IST
> 2016),1458730980000,1458731040000)current booking count 1
>
> The key is email id from the booking object.
>
>
> On Wed, Mar 23, 2016 at 4:32 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> what is the input for each of those outputs? Could you maybe print this:
>>
>>     System.out.println(in + “, current booking count "+value)
>>
>> Also, what is the key that you specify for your KeyedStream?
>>
>> Cheers,
>> Aljoscha
>> > On 23 Mar 2016, at 11:53, Balaji Rajagopalan <
>> balaji.rajagopa...@olacabs.com> wrote:
>> >
>> > I wrote the below code which will increment a counter for the data in
>> the datastream, and when I print the counter each time it seems the value
>> is reinitialised to 0, and it is not incrementing, any thoughts.
>> >
>> > class BookingCntFlatMapFunction extends
>> RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)]
>> > {
>> >
>> >
>> >   @transient var bookingCnt:ValueState[Int] = null
>> >
>> >   override def flatMap(in: (Booking, Long, Long), out:
>> Collector[(Booking, Long, Long)]): Unit = {
>> >     var value = bookingCnt.value()
>> >     value += 1
>> >     System.out.println("current booking count "+value)
>> >
>> >     bookingCnt.update(value)
>> >      out.collect(in)
>> >   }
>> >
>> >   override def open( config:Configuration): Unit = {
>> >     val descriptor: ValueStateDescriptor[Int] = new
>> ValueStateDescriptor[Int]("bookingcnt",
>> >       TypeInformation.of(new TypeHint[Int]() {}),0)
>> >     bookingCnt = getRuntimeContext().getState(descriptor);
>> >
>> >   }
>> >
>> >
>> > }
>> > Output of the program:
>> > current booking count 1
>> > current booking count 1
>> > current booking count 1
>> > current booking count 1
>> > current booking count 1
>> > current booking count 1
>> > current booking count 1
>> >
>>
>>
>

Reply via email to