Hi!

Count windows are a little tricky. If you have a growing key space, the
state keeps growing.

Lets say you have a bunch of values for key "A". You will fire the count
windows for every two elements and keep one element. If "A" never comes
again after that, the one element will still be kept around, in order to
fire when the next element for "A" comes.

That is just the nature of count windows. In practice, count windows will
mostly need a "timeout" after which they will be cleared. Think of it as a
session window (cleared when there are x hours of inactivity) inside which
there are repeated count-based triggers.

Does that explain your case?

Greetings,
Stephan


On Thu, Aug 18, 2016 at 8:11 AM, Janardhan Reddy <
janardhan.re...@olacabs.com> wrote:

> Hi,
>
> I am noticing that the checkpointing state has been constantly growing for
> the below subtask. Only the current active window elements should be
> checkpointed ? why is it constantly growing ?
>
> finalStream.keyBy("<>").countWindow(2,1)
>   .apply((_, _, input: scala.Iterable[], out: Collector[]) => {
>     val inputArray = input.toArray
>
> ... do something
>
> }
>
>

Reply via email to