Hi,
the output at 19:44:44.635 is indeed strange. Is this reproducible?

As for the removal of windows. That is a pitfall a lot of users have fallen 
into. The timeWindowAll() call just sets up a window assigner, so in your case 
the equivalent call would be:

     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
     .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) <— 
difference is here
     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
     .fold(Set[String]()){(r,i) => { r + i}}

The window assigners itself does not do any cleanup or triggering of window 
processing. It does, however, come with a default Trigger which is 
ProcessingTimeTrigger in case of TumblingProcessingTimeWindows. This trigger 
fill fire once at the end of a window and then also purge the window contents. 
By calling trigger() the default trigger is replaced and 
ContinuousProcessingTimeTrigger does not clean up (purge) windows.

This is something that seems to happen for a lot of people, I therefore started 
an initiative to try and improve windows/triggers: 
https://mail-archives.apache.org/mod_mbox/flink-dev/201603.mbox/%3c16991435-118a-403b-b766-634908325...@apache.org%3e

I created an associated doc to keep track of my proposed changes: 
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing

What do you think?

Cheers,
Aljoscha
> On 23 Mar 2016, at 07:52, Hironori Ogibayashi <ogibaya...@gmail.com> wrote:
> 
> Aljoscha,
> 
> Thank you for fixing the issue.
> I built both Flink server and job with the code you provided, and it
> worked as almost expected.
> The output was below. I am wondering why the value emitted at
> 19:44:44.635 while I set
> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a
> problem for me.
> 
> ---
> (2016-03-22 19:44:35.002,1)
> (2016-03-22 19:44:44.635,2)
> (2016-03-22 19:44:45.001,2)
> (2016-03-22 19:45:45.001,1)
> ---
> 
> And regarding the removal from the window, you mean the data remains
> in the window even if
> I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)?
> I thought that ContinuousProcessingTimeTrigger works on top of
> timeWindowAll and timeWindowAll
> take care of purging data from the window.
> 
> ---
> .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
> ---
> 
> Regards,
> Hironori
> 
> 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
>> Hi,
>> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The 
>> timer is not correctly set. You can try it with this fixed version, that I 
>> will also update in the Flink code: 
>> https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
>> 
>> One more thing, the ContinuousProcessingTimeTrigger will never remove the 
>> window. The default EventTimeTrigger will fire a window and purge the 
>> contents while the ContinuousProcessingTimeTrigger will only ever fire for a 
>> window. This means that you will have a lot of windows hanging around in 
>> your state at some points and they will never be cleaned up. For now, if you 
>> require the behavior of continuously firing on a TimeWindow I would suggest 
>> to write a custom Trigger based on EventTimeTrigger (or 
>> ProcessingTimeTrigger) that does the firing and purging on time and also has 
>> the continuous triggering at earlier times.
>> 
>> Let us know if you need more information about this. Kostas Kloudas also 
>> recently looked into writing custom Triggers, so maybe he has some material 
>> he could give to you.
>> 
>> Cheers,
>> Aljoscha
>>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibaya...@gmail.com> wrote:
>>> 
>>> Hello,
>>> 
>>> I have a question about TumblingProcessingTimeWindow and
>>> ContinuousProcessingTimeTrigger.
>>> 
>>> The code I tried is below. Output the distinct count of the words,
>>> counts are printed every 5 seconds and window is reset every 1 minute.
>>> 
>>> ---
>>>   val input =
>>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>>     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>>     .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>>     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>>     .fold(Set[String]()){(r,i) => { r + i}}
>>>     .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>> 
>>>   input print
>>> ---
>>> 
>>> I wrote data to the input file with some interval.
>>> 
>>> ---
>>> echo "aaa" >> input.txt
>>> echo "aaa" >> input.txt
>>> sleep 10
>>> echo "bbb" >> input.txt
>>> sleep 60
>>> echo "ccc" >> input.txt
>>> ---
>>> 
>>> The result I got was just 1 record. The expected output was 1 -> (10+
>>> sec later) 2 -> (60+ sec later) 1 .
>>> ---
>>> (2016-03-18 13:08:59.288,2)
>>> ---
>>> 
>>> Even after several minutes, I never got additional record. In my
>>> understanding, with
>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
>>> operator (fold, map) in the code above will be evaluated every 5
>>> seconds.
>>> Am I mis-understand something?
>>> 
>>> Regards,
>>> Hironori
>> 

Reply via email to