Hi,
We are facing the exact problem as described by Matthias above.
We are keeping default until which is 1 day.

Our record's times tamp extractor has a field which increases with time.
However for short time we cannot guarantee the time stamp is always
increases. So at the boundary ie after 24 hrs we can get records which are
beyond that windows retention period.

Then it happens like it is mentioned above and our aggregation fails.

So just to sum up when we get record
24h + 1 sec (it deletes older window and since the new record belongs to
the new window its gets created)
Now when we get next record of 24 hs - 1 sec since older window is dropped
it does not get aggregated in that bucket.

I suggest we have another setting next to until call retain which retains
the older windows into next window.

I think at stream window boundary level it should use a concept of sliding
window. So we can define window like

TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * 1000l).untill(7
* 24 * 3600 * 1000l).retain(900 * 1000l)

So after 7 days it retains the data covered by windows in last 15 minutes
which rolls over the data in them to next window. This way streams work
continuously.

Please let us know your thoughts on this.

On another side question on this there is a setting:

windowstore.changelog.additional.retention.ms
I is not clear what is does. Is this the default for until?

Thanks
Sachin


On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Windows are created on demand, ie, each time a new record arrives and
> there is no window yet for it, a new window will get created.
>
> Windows are accepting data until their retention time (that you can
> configure via .until()) passed. Thus, you will have many windows being
> open in parallel.
>
> If you read older data, they will just be put into the corresponding
> windows (as long as window retention time did not pass). If a window was
> discarded already, a new window with this single (later arriving) record
> will get created, the computation will be triggered, you get a result,
> and afterwards the window is deleted again (as it's retention time
> passed already).
>
> The retention time is driven by "stream-time", in internal tracked time
> that only progressed in forward direction. It gets it value from the
> timestamps provided by TimestampExtractor -- thus, per default it will
> be event-time.
>
> -Matthias
>
> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> > I've read this and still have more questions than answers. If my data
> skips
> > about (timewise) what determines when a given window will start / stop
> > accepting new data? What if Im reading data from some time ago?
> >
> > On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Please have a look here:
> >>
> >> http://docs.confluent.io/current/streams/developer-
> >> guide.html#windowing-a-stream
> >>
> >> If you have further question, just follow up :)
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>> Ive added the 'until()' clause to some aggregation steps and it's
> working
> >>> wonders for keeping the size of the state store in useful boundaries...
> >> But
> >>> Im not 100% clear on how it works.
> >>>
> >>> What is implied by the '.until()' clause? What determines when to stop
> >>> receiving further data - is it clock time (since the window was
> created)?
> >>> It seems problematic for it to refer to EventTime as this may bounce
> all
> >>> over the place. For non-overlapping windows a given record can only
> fall
> >>> into a single aggregation period - so when would a value get discarded?
> >>>
> >>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >> 1000L).until(10 *
> >>> 1000L))'  - but what is this accomplishing?
> >>>
> >>
> >>
> >
>
>

Reply via email to