Hi! Count windows are a little tricky. If you have a growing key space, the state keeps growing.
Lets say you have a bunch of values for key "A". You will fire the count windows for every two elements and keep one element. If "A" never comes again after that, the one element will still be kept around, in order to fire when the next element for "A" comes. That is just the nature of count windows. In practice, count windows will mostly need a "timeout" after which they will be cleared. Think of it as a session window (cleared when there are x hours of inactivity) inside which there are repeated count-based triggers. Does that explain your case? Greetings, Stephan On Thu, Aug 18, 2016 at 8:11 AM, Janardhan Reddy < janardhan.re...@olacabs.com> wrote: > Hi, > > I am noticing that the checkpointing state has been constantly growing for > the below subtask. Only the current active window elements should be > checkpointed ? why is it constantly growing ? > > finalStream.keyBy("<>").countWindow(2,1) > .apply((_, _, input: scala.Iterable[], out: Collector[]) => { > val inputArray = input.toArray > > ... do something > > } > >