Hi, what is the input for each of those outputs? Could you maybe print this:
System.out.println(in + “, current booking count "+value) Also, what is the key that you specify for your KeyedStream? Cheers, Aljoscha > On 23 Mar 2016, at 11:53, Balaji Rajagopalan <balaji.rajagopa...@olacabs.com> > wrote: > > I wrote the below code which will increment a counter for the data in the > datastream, and when I print the counter each time it seems the value is > reinitialised to 0, and it is not incrementing, any thoughts. > > class BookingCntFlatMapFunction extends > RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)] > { > > > @transient var bookingCnt:ValueState[Int] = null > > override def flatMap(in: (Booking, Long, Long), out: Collector[(Booking, > Long, Long)]): Unit = { > var value = bookingCnt.value() > value += 1 > System.out.println("current booking count "+value) > > bookingCnt.update(value) > out.collect(in) > } > > override def open( config:Configuration): Unit = { > val descriptor: ValueStateDescriptor[Int] = new > ValueStateDescriptor[Int]("bookingcnt", > TypeInformation.of(new TypeHint[Int]() {}),0) > bookingCnt = getRuntimeContext().getState(descriptor); > > } > > > } > Output of the program: > current booking count 1 > current booking count 1 > current booking count 1 > current booking count 1 > current booking count 1 > current booking count 1 > current booking count 1 >