Never mind I understand what is going on Aljoscha for each unique key the value count is reset to 0.
On Wed, Mar 23, 2016 at 4:37 PM, Balaji Rajagopalan < balaji.rajagopa...@olacabs.com> wrote: > (Booking(te7uc4,compact,j...@gmail.com,Mon Feb 29 19:19:40 IST > 2016),1458730980000,1458731040000)current booking count 1 > (Booking(tdr1ym,compact,er...@gmail.com,Mon Feb 29 18:41:07 IST > 2016),1458730980000,1458731040000)current booking count 1 > (Booking(t9zvqw,compact,yas...@gmail.com,Mon Feb 29 19:19:40 IST > 2016),1458730980000,1458731040000)current booking count 1 > (Booking(tdr1e8,compact,k....@gmail.com,Mon Feb 29 18:41:07 IST > 2016),1458730980000,1458731040000)current booking count 1 > (Booking(tdntcj,compact,e...@gmail.com,Mon Feb 29 19:19:40 IST > 2016),1458730980000,1458731040000)current booking count 1 > (Booking(tdr1wv,compact,e...@gmail.com,Mon Feb 29 18:41:07 IST > 2016),1458730980000,1458731040000)current booking count 1 > (Booking(tdr1wv,compact,er...@yahoo.in,Mon Feb 29 18:41:07 IST > 2016),1458730980000,1458731040000)current booking count 1 > > The key is email id from the booking object. > > > On Wed, Mar 23, 2016 at 4:32 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> what is the input for each of those outputs? Could you maybe print this: >> >> System.out.println(in + “, current booking count "+value) >> >> Also, what is the key that you specify for your KeyedStream? >> >> Cheers, >> Aljoscha >> > On 23 Mar 2016, at 11:53, Balaji Rajagopalan < >> balaji.rajagopa...@olacabs.com> wrote: >> > >> > I wrote the below code which will increment a counter for the data in >> the datastream, and when I print the counter each time it seems the value >> is reinitialised to 0, and it is not incrementing, any thoughts. >> > >> > class BookingCntFlatMapFunction extends >> RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)] >> > { >> > >> > >> > @transient var bookingCnt:ValueState[Int] = null >> > >> > override def flatMap(in: (Booking, Long, Long), out: >> Collector[(Booking, Long, Long)]): Unit = { >> > var value = bookingCnt.value() >> > value += 1 >> > System.out.println("current booking count "+value) >> > >> > bookingCnt.update(value) >> > out.collect(in) >> > } >> > >> > override def open( config:Configuration): Unit = { >> > val descriptor: ValueStateDescriptor[Int] = new >> ValueStateDescriptor[Int]("bookingcnt", >> > TypeInformation.of(new TypeHint[Int]() {}),0) >> > bookingCnt = getRuntimeContext().getState(descriptor); >> > >> > } >> > >> > >> > } >> > Output of the program: >> > current booking count 1 >> > current booking count 1 >> > current booking count 1 >> > current booking count 1 >> > current booking count 1 >> > current booking count 1 >> > current booking count 1 >> > >> >> >