Windows are created on demand, ie, each time a new record arrives and
there is no window yet for it, a new window will get created.

Windows are accepting data until their retention time (that you can
configure via .until()) passed. Thus, you will have many windows being
open in parallel.

If you read older data, they will just be put into the corresponding
windows (as long as window retention time did not pass). If a window was
discarded already, a new window with this single (later arriving) record
will get created, the computation will be triggered, you get a result,
and afterwards the window is deleted again (as it's retention time
passed already).

The retention time is driven by "stream-time", in internal tracked time
that only progressed in forward direction. It gets it value from the
timestamps provided by TimestampExtractor -- thus, per default it will
be event-time.

-Matthias

On 12/11/16 3:47 PM, Jon Yeargers wrote:
> I've read this and still have more questions than answers. If my data skips
> about (timewise) what determines when a given window will start / stop
> accepting new data? What if Im reading data from some time ago?
> 
> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Please have a look here:
>>
>> http://docs.confluent.io/current/streams/developer-
>> guide.html#windowing-a-stream
>>
>> If you have further question, just follow up :)
>>
>>
>> -Matthias
>>
>>
>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>> Ive added the 'until()' clause to some aggregation steps and it's working
>>> wonders for keeping the size of the state store in useful boundaries...
>> But
>>> Im not 100% clear on how it works.
>>>
>>> What is implied by the '.until()' clause? What determines when to stop
>>> receiving further data - is it clock time (since the window was created)?
>>> It seems problematic for it to refer to EventTime as this may bounce all
>>> over the place. For non-overlapping windows a given record can only fall
>>> into a single aggregation period - so when would a value get discarded?
>>>
>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>> 1000L).until(10 *
>>> 1000L))'  - but what is this accomplishing?
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to