Windows are created on demand, ie, each time a new record arrives and there is no window yet for it, a new window will get created.
Windows are accepting data until their retention time (that you can configure via .until()) passed. Thus, you will have many windows being open in parallel. If you read older data, they will just be put into the corresponding windows (as long as window retention time did not pass). If a window was discarded already, a new window with this single (later arriving) record will get created, the computation will be triggered, you get a result, and afterwards the window is deleted again (as it's retention time passed already). The retention time is driven by "stream-time", in internal tracked time that only progressed in forward direction. It gets it value from the timestamps provided by TimestampExtractor -- thus, per default it will be event-time. -Matthias On 12/11/16 3:47 PM, Jon Yeargers wrote: > I've read this and still have more questions than answers. If my data skips > about (timewise) what determines when a given window will start / stop > accepting new data? What if Im reading data from some time ago? > > On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Please have a look here: >> >> http://docs.confluent.io/current/streams/developer- >> guide.html#windowing-a-stream >> >> If you have further question, just follow up :) >> >> >> -Matthias >> >> >> On 12/10/16 6:11 PM, Jon Yeargers wrote: >>> Ive added the 'until()' clause to some aggregation steps and it's working >>> wonders for keeping the size of the state store in useful boundaries... >> But >>> Im not 100% clear on how it works. >>> >>> What is implied by the '.until()' clause? What determines when to stop >>> receiving further data - is it clock time (since the window was created)? >>> It seems problematic for it to refer to EventTime as this may bounce all >>> over the place. For non-overlapping windows a given record can only fall >>> into a single aggregation period - so when would a value get discarded? >>> >>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * >> 1000L).until(10 * >>> 1000L))' - but what is this accomplishing? >>> >> >> >
signature.asc
Description: OpenPGP digital signature