Hi,
So is until for future or past?
Say I get first record at t = 0 and until is 100 and my window size is 50
advance by 25.
I understand it will create windows (0, 50), (25, 75), (50, 100)
Now at t = 101 it will drop
(0, 50), (25, 75), (50, 100) and create
(101, 150), (125, 175), (150, 200)

Please confirm if this understanding us correct. It is not clear how it
will handle overlapping windows (75, 125) and (175, 225) and so on?

What case is not clear again is that at say t = 102 I get some message with
timestamp 99. What happens then?
Will the result added to previous aggregation of (50, 100) or (75, 125),
like it should.

Or it will recreate the old window (50, 100) and aggregate the value there
and then drop it. This would result is wrong aggregated value, as it does
not consider the previous aggregated values.

So this is the pressing case I am not able to understand. Maybe I am wrong
at some basic understanding.


Next for
The parameter
> windowstore.changelog.additional.retention.ms

How does this relate to rentention.ms param of topic config?
I create internal topic manually using say rentention.ms=3600000.
In next release (post kafka_2.10-0.10.0.1) since we support delete of
internal changelog topic as well and I want it to be retained for say just
1 hour.
So how does that above parameter interfere with this topic level setting.
Or now I just need to set above config as 3600000 and not add
rentention.ms=3600000
while creating internal topic.
This is just another doubt remaining here.

Thanks
Sachin



On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Sachin,
>
> There is no reason to have an .until() AND a .retain() -- just increase
> the value of .until()
>
> If you have a window of let's say 1h size and you set .until() also to
> 1h -- you can obviously not process any late arriving data. If you set
> until() to 2h is this example, you can process data that is up to 1h
> delayed.
>
> So basically, the retention should always be larger than you window size.
>
> The parameter
> > windowstore.changelog.additional.retention.ms
>
> is applies to changelog topics that backup window state stores. Those
> changelog topics are compacted. However, the used key does encode an
> window ID and thus older data can never be cleaned up by compaction.
> Therefore, an additional retention time is applied to those topics, too.
> Thus, if an old window is not updated for this amount of time, it will
> get deleted eventually preventing this topic to grown infinitely.
>
> The value will be determined by until(), i.e., whatever you specify in
> .until() will be used to set this parameter.
>
>
> -Matthias
>
> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> > Hi,
> > We are facing the exact problem as described by Matthias above.
> > We are keeping default until which is 1 day.
> >
> > Our record's times tamp extractor has a field which increases with time.
> > However for short time we cannot guarantee the time stamp is always
> > increases. So at the boundary ie after 24 hrs we can get records which
> are
> > beyond that windows retention period.
> >
> > Then it happens like it is mentioned above and our aggregation fails.
> >
> > So just to sum up when we get record
> > 24h + 1 sec (it deletes older window and since the new record belongs to
> > the new window its gets created)
> > Now when we get next record of 24 hs - 1 sec since older window is
> dropped
> > it does not get aggregated in that bucket.
> >
> > I suggest we have another setting next to until call retain which retains
> > the older windows into next window.
> >
> > I think at stream window boundary level it should use a concept of
> sliding
> > window. So we can define window like
> >
> > TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> 1000l).untill(7
> > * 24 * 3600 * 1000l).retain(900 * 1000l)
> >
> > So after 7 days it retains the data covered by windows in last 15 minutes
> > which rolls over the data in them to next window. This way streams work
> > continuously.
> >
> > Please let us know your thoughts on this.
> >
> > On another side question on this there is a setting:
> >
> > windowstore.changelog.additional.retention.ms
> > I is not clear what is does. Is this the default for until?
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <matth...@confluent.io
> >
> > wrote:
> >
> >> Windows are created on demand, ie, each time a new record arrives and
> >> there is no window yet for it, a new window will get created.
> >>
> >> Windows are accepting data until their retention time (that you can
> >> configure via .until()) passed. Thus, you will have many windows being
> >> open in parallel.
> >>
> >> If you read older data, they will just be put into the corresponding
> >> windows (as long as window retention time did not pass). If a window was
> >> discarded already, a new window with this single (later arriving) record
> >> will get created, the computation will be triggered, you get a result,
> >> and afterwards the window is deleted again (as it's retention time
> >> passed already).
> >>
> >> The retention time is driven by "stream-time", in internal tracked time
> >> that only progressed in forward direction. It gets it value from the
> >> timestamps provided by TimestampExtractor -- thus, per default it will
> >> be event-time.
> >>
> >> -Matthias
> >>
> >> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>> I've read this and still have more questions than answers. If my data
> >> skips
> >>> about (timewise) what determines when a given window will start / stop
> >>> accepting new data? What if Im reading data from some time ago?
> >>>
> >>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> Please have a look here:
> >>>>
> >>>> http://docs.confluent.io/current/streams/developer-
> >>>> guide.html#windowing-a-stream
> >>>>
> >>>> If you have further question, just follow up :)
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>> Ive added the 'until()' clause to some aggregation steps and it's
> >> working
> >>>>> wonders for keeping the size of the state store in useful
> boundaries...
> >>>> But
> >>>>> Im not 100% clear on how it works.
> >>>>>
> >>>>> What is implied by the '.until()' clause? What determines when to
> stop
> >>>>> receiving further data - is it clock time (since the window was
> >> created)?
> >>>>> It seems problematic for it to refer to EventTime as this may bounce
> >> all
> >>>>> over the place. For non-overlapping windows a given record can only
> >> fall
> >>>>> into a single aggregation period - so when would a value get
> discarded?
> >>>>>
> >>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >>>> 1000L).until(10 *
> >>>>> 1000L))'  - but what is this accomplishing?
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to