I also thought that the checkpointing state size was growing due to growing
key space, i registered processing time timer on 'onevent' and wrote a
custom trigger and still the checkpointing state size was growing.

Our code is linked with flink 1.0.0 jar but was running on flink 1.1.1
(yarn session).

On running the code with flink 1.0.0(on yarn), we are no longer noticing
the issue with the same code.

On Thu, Aug 18, 2016 at 2:46 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Count windows are a little tricky. If you have a growing key space, the
> state keeps growing.
>
> Lets say you have a bunch of values for key "A". You will fire the count
> windows for every two elements and keep one element. If "A" never comes
> again after that, the one element will still be kept around, in order to
> fire when the next element for "A" comes.
>
> That is just the nature of count windows. In practice, count windows will
> mostly need a "timeout" after which they will be cleared. Think of it as a
> session window (cleared when there are x hours of inactivity) inside which
> there are repeated count-based triggers.
>
> Does that explain your case?
>
> Greetings,
> Stephan
>
>
> On Thu, Aug 18, 2016 at 8:11 AM, Janardhan Reddy <
> janardhan.re...@olacabs.com> wrote:
>
>> Hi,
>>
>> I am noticing that the checkpointing state has been constantly growing
>> for the below subtask. Only the current active window elements should be
>> checkpointed ? why is it constantly growing ?
>>
>> finalStream.keyBy("<>").countWindow(2,1)
>>   .apply((_, _, input: scala.Iterable[], out: Collector[]) => {
>>     val inputArray = input.toArray
>>
>> ... do something
>>
>> }
>>
>>
>

Reply via email to