I am wondering about "I create internal topic manually" -- which topics
do you refer in detail?

Kafka Streams create all kind of internal topics with auto-generated
names. So it would be quite tricky to create all of them manually
(especially because you need to know those name in advance).

IRRC, if a topic does exist, Kafka Streams does no change it's
configuration. Only if Kafka Streams does create a topic, it will
specify certain config parameters on topic create step.


-Matthias



On 12/13/16 8:16 PM, Sachin Mittal wrote:
> Hi,
> Thanks for the explanation. This illustration makes it super easy to
> understand how until works. Perhaps we can update the wiki with this
> illustration.
> It is basically the retention time for a past window.
> I used to think until creates all the future windows for that period and
> when time passes that it used to delete all the past windows. However
> actually until retains a window for specified time. This makes so much more
> sense.
> 
> I just had one pending query regarding:
> 
>> windowstore.changelog.additional.retention.ms
> 
> How does this relate to rentention.ms param of topic config?
> I create internal topic manually using say rentention.ms=3600000.
> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> internal changelog topic as well and I want it to be retained for say just
> 1 hour.
> So how does that above parameter interfere with this topic level setting.
> Or now I just need to set above config as 3600000 and not add
> rentention.ms=3600000
> while creating internal topic.
> 
> Thanks
> Sachin
> 
> 
> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> First, windows are only created if there is actual data for a window. So
>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
>> falling into each window (btw: window start-time is inclusive while
>> window end time is exclusive). If you have only 2 record with lets say
>> ts=20 and ts=90 you will not have an open window [25,75). Each window is
>> physically created each time the first record for it is processed.
>>
>> If you have above 4 windows and a record with ts=101 arrives, a new
>> window [101,151) will be created. Window [0,50) will not be deleted yet,
>> because retention is 100 and thus Streams guarantees that all record
>> with ts >= 1 (= 101 - 100) are still processed correctly and those
>> records would fall into window [0,50).
>>
>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
>> not before that.
>>
>> -Matthias
>>
>>
>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
>>> Hi,
>>> So is until for future or past?
>>> Say I get first record at t = 0 and until is 100 and my window size is 50
>>> advance by 25.
>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
>>> Now at t = 101 it will drop
>>> (0, 50), (25, 75), (50, 100) and create
>>> (101, 150), (125, 175), (150, 200)
>>>
>>> Please confirm if this understanding us correct. It is not clear how it
>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
>>>
>>> What case is not clear again is that at say t = 102 I get some message
>> with
>>> timestamp 99. What happens then?
>>> Will the result added to previous aggregation of (50, 100) or (75, 125),
>>> like it should.
>>>
>>> Or it will recreate the old window (50, 100) and aggregate the value
>> there
>>> and then drop it. This would result is wrong aggregated value, as it does
>>> not consider the previous aggregated values.
>>>
>>> So this is the pressing case I am not able to understand. Maybe I am
>> wrong
>>> at some basic understanding.
>>>
>>>
>>> Next for
>>> The parameter
>>>> windowstore.changelog.additional.retention.ms
>>>
>>> How does this relate to rentention.ms param of topic config?
>>> I create internal topic manually using say rentention.ms=3600000.
>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>> internal changelog topic as well and I want it to be retained for say
>> just
>>> 1 hour.
>>> So how does that above parameter interfere with this topic level setting.
>>> Or now I just need to set above config as 3600000 and not add
>>> rentention.ms=3600000
>>> while creating internal topic.
>>> This is just another doubt remaining here.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Sachin,
>>>>
>>>> There is no reason to have an .until() AND a .retain() -- just increase
>>>> the value of .until()
>>>>
>>>> If you have a window of let's say 1h size and you set .until() also to
>>>> 1h -- you can obviously not process any late arriving data. If you set
>>>> until() to 2h is this example, you can process data that is up to 1h
>>>> delayed.
>>>>
>>>> So basically, the retention should always be larger than you window
>> size.
>>>>
>>>> The parameter
>>>>> windowstore.changelog.additional.retention.ms
>>>>
>>>> is applies to changelog topics that backup window state stores. Those
>>>> changelog topics are compacted. However, the used key does encode an
>>>> window ID and thus older data can never be cleaned up by compaction.
>>>> Therefore, an additional retention time is applied to those topics, too.
>>>> Thus, if an old window is not updated for this amount of time, it will
>>>> get deleted eventually preventing this topic to grown infinitely.
>>>>
>>>> The value will be determined by until(), i.e., whatever you specify in
>>>> .until() will be used to set this parameter.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
>>>>> Hi,
>>>>> We are facing the exact problem as described by Matthias above.
>>>>> We are keeping default until which is 1 day.
>>>>>
>>>>> Our record's times tamp extractor has a field which increases with
>> time.
>>>>> However for short time we cannot guarantee the time stamp is always
>>>>> increases. So at the boundary ie after 24 hrs we can get records which
>>>> are
>>>>> beyond that windows retention period.
>>>>>
>>>>> Then it happens like it is mentioned above and our aggregation fails.
>>>>>
>>>>> So just to sum up when we get record
>>>>> 24h + 1 sec (it deletes older window and since the new record belongs
>> to
>>>>> the new window its gets created)
>>>>> Now when we get next record of 24 hs - 1 sec since older window is
>>>> dropped
>>>>> it does not get aggregated in that bucket.
>>>>>
>>>>> I suggest we have another setting next to until call retain which
>> retains
>>>>> the older windows into next window.
>>>>>
>>>>> I think at stream window boundary level it should use a concept of
>>>> sliding
>>>>> window. So we can define window like
>>>>>
>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>>>> 1000l).untill(7
>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
>>>>>
>>>>> So after 7 days it retains the data covered by windows in last 15
>> minutes
>>>>> which rolls over the data in them to next window. This way streams work
>>>>> continuously.
>>>>>
>>>>> Please let us know your thoughts on this.
>>>>>
>>>>> On another side question on this there is a setting:
>>>>>
>>>>> windowstore.changelog.additional.retention.ms
>>>>> I is not clear what is does. Is this the default for until?
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
>> matth...@confluent.io
>>>>>
>>>>> wrote:
>>>>>
>>>>>> Windows are created on demand, ie, each time a new record arrives and
>>>>>> there is no window yet for it, a new window will get created.
>>>>>>
>>>>>> Windows are accepting data until their retention time (that you can
>>>>>> configure via .until()) passed. Thus, you will have many windows being
>>>>>> open in parallel.
>>>>>>
>>>>>> If you read older data, they will just be put into the corresponding
>>>>>> windows (as long as window retention time did not pass). If a window
>> was
>>>>>> discarded already, a new window with this single (later arriving)
>> record
>>>>>> will get created, the computation will be triggered, you get a result,
>>>>>> and afterwards the window is deleted again (as it's retention time
>>>>>> passed already).
>>>>>>
>>>>>> The retention time is driven by "stream-time", in internal tracked
>> time
>>>>>> that only progressed in forward direction. It gets it value from the
>>>>>> timestamps provided by TimestampExtractor -- thus, per default it will
>>>>>> be event-time.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>>>>>> I've read this and still have more questions than answers. If my data
>>>>>> skips
>>>>>>> about (timewise) what determines when a given window will start /
>> stop
>>>>>>> accepting new data? What if Im reading data from some time ago?
>>>>>>>
>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
>>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Please have a look here:
>>>>>>>>
>>>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>>>> guide.html#windowing-a-stream
>>>>>>>>
>>>>>>>> If you have further question, just follow up :)
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and it's
>>>>>> working
>>>>>>>>> wonders for keeping the size of the state store in useful
>>>> boundaries...
>>>>>>>> But
>>>>>>>>> Im not 100% clear on how it works.
>>>>>>>>>
>>>>>>>>> What is implied by the '.until()' clause? What determines when to
>>>> stop
>>>>>>>>> receiving further data - is it clock time (since the window was
>>>>>> created)?
>>>>>>>>> It seems problematic for it to refer to EventTime as this may
>> bounce
>>>>>> all
>>>>>>>>> over the place. For non-overlapping windows a given record can only
>>>>>> fall
>>>>>>>>> into a single aggregation period - so when would a value get
>>>> discarded?
>>>>>>>>>
>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>>>>>> 1000L).until(10 *
>>>>>>>>> 1000L))'  - but what is this accomplishing?
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to