Understood. Makes sense.

For this, you should apply Streams configs manually when creating those
topics. For retention parameter, use the value you specify in
corresponding .until() method for it.


-Matthias


On 12/14/16 10:08 AM, Sachin Mittal wrote:
> I was referring to internal change log topic. I had to create them manually
> because in some case the message size of these topic were greater than the
> default ones used by kafka streams.
> 
> I think someone in this group recommended to create these topic manually. I
> understand that it is better to have internal topics created by streams app
> and I will take a second look at these and see if that can be done.
> 
> I just wanted to make sure what all configs are applied to internal topics
> in order to decide to avoid them creating manually.
> 
> Thanks
> Sachin
> 
> 
> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> I am wondering about "I create internal topic manually" -- which topics
>> do you refer in detail?
>>
>> Kafka Streams create all kind of internal topics with auto-generated
>> names. So it would be quite tricky to create all of them manually
>> (especially because you need to know those name in advance).
>>
>> IRRC, if a topic does exist, Kafka Streams does no change it's
>> configuration. Only if Kafka Streams does create a topic, it will
>> specify certain config parameters on topic create step.
>>
>>
>> -Matthias
>>
>>
>>
>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
>>> Hi,
>>> Thanks for the explanation. This illustration makes it super easy to
>>> understand how until works. Perhaps we can update the wiki with this
>>> illustration.
>>> It is basically the retention time for a past window.
>>> I used to think until creates all the future windows for that period and
>>> when time passes that it used to delete all the past windows. However
>>> actually until retains a window for specified time. This makes so much
>> more
>>> sense.
>>>
>>> I just had one pending query regarding:
>>>
>>>> windowstore.changelog.additional.retention.ms
>>>
>>> How does this relate to rentention.ms param of topic config?
>>> I create internal topic manually using say rentention.ms=3600000.
>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>> internal changelog topic as well and I want it to be retained for say
>> just
>>> 1 hour.
>>> So how does that above parameter interfere with this topic level setting.
>>> Or now I just need to set above config as 3600000 and not add
>>> rentention.ms=3600000
>>> while creating internal topic.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <matth...@confluent.io
>>>
>>> wrote:
>>>
>>>> First, windows are only created if there is actual data for a window. So
>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
>>>> falling into each window (btw: window start-time is inclusive while
>>>> window end time is exclusive). If you have only 2 record with lets say
>>>> ts=20 and ts=90 you will not have an open window [25,75). Each window is
>>>> physically created each time the first record for it is processed.
>>>>
>>>> If you have above 4 windows and a record with ts=101 arrives, a new
>>>> window [101,151) will be created. Window [0,50) will not be deleted yet,
>>>> because retention is 100 and thus Streams guarantees that all record
>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
>>>> records would fall into window [0,50).
>>>>
>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
>>>> not before that.
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
>>>>> Hi,
>>>>> So is until for future or past?
>>>>> Say I get first record at t = 0 and until is 100 and my window size is
>> 50
>>>>> advance by 25.
>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
>>>>> Now at t = 101 it will drop
>>>>> (0, 50), (25, 75), (50, 100) and create
>>>>> (101, 150), (125, 175), (150, 200)
>>>>>
>>>>> Please confirm if this understanding us correct. It is not clear how it
>>>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
>>>>>
>>>>> What case is not clear again is that at say t = 102 I get some message
>>>> with
>>>>> timestamp 99. What happens then?
>>>>> Will the result added to previous aggregation of (50, 100) or (75,
>> 125),
>>>>> like it should.
>>>>>
>>>>> Or it will recreate the old window (50, 100) and aggregate the value
>>>> there
>>>>> and then drop it. This would result is wrong aggregated value, as it
>> does
>>>>> not consider the previous aggregated values.
>>>>>
>>>>> So this is the pressing case I am not able to understand. Maybe I am
>>>> wrong
>>>>> at some basic understanding.
>>>>>
>>>>>
>>>>> Next for
>>>>> The parameter
>>>>>> windowstore.changelog.additional.retention.ms
>>>>>
>>>>> How does this relate to rentention.ms param of topic config?
>>>>> I create internal topic manually using say rentention.ms=3600000.
>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>>>> internal changelog topic as well and I want it to be retained for say
>>>> just
>>>>> 1 hour.
>>>>> So how does that above parameter interfere with this topic level
>> setting.
>>>>> Or now I just need to set above config as 3600000 and not add
>>>>> rentention.ms=3600000
>>>>> while creating internal topic.
>>>>> This is just another doubt remaining here.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
>> matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Sachin,
>>>>>>
>>>>>> There is no reason to have an .until() AND a .retain() -- just
>> increase
>>>>>> the value of .until()
>>>>>>
>>>>>> If you have a window of let's say 1h size and you set .until() also to
>>>>>> 1h -- you can obviously not process any late arriving data. If you set
>>>>>> until() to 2h is this example, you can process data that is up to 1h
>>>>>> delayed.
>>>>>>
>>>>>> So basically, the retention should always be larger than you window
>>>> size.
>>>>>>
>>>>>> The parameter
>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>
>>>>>> is applies to changelog topics that backup window state stores. Those
>>>>>> changelog topics are compacted. However, the used key does encode an
>>>>>> window ID and thus older data can never be cleaned up by compaction.
>>>>>> Therefore, an additional retention time is applied to those topics,
>> too.
>>>>>> Thus, if an old window is not updated for this amount of time, it will
>>>>>> get deleted eventually preventing this topic to grown infinitely.
>>>>>>
>>>>>> The value will be determined by until(), i.e., whatever you specify in
>>>>>> .until() will be used to set this parameter.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
>>>>>>> Hi,
>>>>>>> We are facing the exact problem as described by Matthias above.
>>>>>>> We are keeping default until which is 1 day.
>>>>>>>
>>>>>>> Our record's times tamp extractor has a field which increases with
>>>> time.
>>>>>>> However for short time we cannot guarantee the time stamp is always
>>>>>>> increases. So at the boundary ie after 24 hrs we can get records
>> which
>>>>>> are
>>>>>>> beyond that windows retention period.
>>>>>>>
>>>>>>> Then it happens like it is mentioned above and our aggregation fails.
>>>>>>>
>>>>>>> So just to sum up when we get record
>>>>>>> 24h + 1 sec (it deletes older window and since the new record belongs
>>>> to
>>>>>>> the new window its gets created)
>>>>>>> Now when we get next record of 24 hs - 1 sec since older window is
>>>>>> dropped
>>>>>>> it does not get aggregated in that bucket.
>>>>>>>
>>>>>>> I suggest we have another setting next to until call retain which
>>>> retains
>>>>>>> the older windows into next window.
>>>>>>>
>>>>>>> I think at stream window boundary level it should use a concept of
>>>>>> sliding
>>>>>>> window. So we can define window like
>>>>>>>
>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>>>>>> 1000l).untill(7
>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
>>>>>>>
>>>>>>> So after 7 days it retains the data covered by windows in last 15
>>>> minutes
>>>>>>> which rolls over the data in them to next window. This way streams
>> work
>>>>>>> continuously.
>>>>>>>
>>>>>>> Please let us know your thoughts on this.
>>>>>>>
>>>>>>> On another side question on this there is a setting:
>>>>>>>
>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>> I is not clear what is does. Is this the default for until?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
>>>> matth...@confluent.io
>>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Windows are created on demand, ie, each time a new record arrives
>> and
>>>>>>>> there is no window yet for it, a new window will get created.
>>>>>>>>
>>>>>>>> Windows are accepting data until their retention time (that you can
>>>>>>>> configure via .until()) passed. Thus, you will have many windows
>> being
>>>>>>>> open in parallel.
>>>>>>>>
>>>>>>>> If you read older data, they will just be put into the corresponding
>>>>>>>> windows (as long as window retention time did not pass). If a window
>>>> was
>>>>>>>> discarded already, a new window with this single (later arriving)
>>>> record
>>>>>>>> will get created, the computation will be triggered, you get a
>> result,
>>>>>>>> and afterwards the window is deleted again (as it's retention time
>>>>>>>> passed already).
>>>>>>>>
>>>>>>>> The retention time is driven by "stream-time", in internal tracked
>>>> time
>>>>>>>> that only progressed in forward direction. It gets it value from the
>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default it
>> will
>>>>>>>> be event-time.
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>>>>>>>> I've read this and still have more questions than answers. If my
>> data
>>>>>>>> skips
>>>>>>>>> about (timewise) what determines when a given window will start /
>>>> stop
>>>>>>>>> accepting new data? What if Im reading data from some time ago?
>>>>>>>>>
>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
>>>>>> matth...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Please have a look here:
>>>>>>>>>>
>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>>>>>> guide.html#windowing-a-stream
>>>>>>>>>>
>>>>>>>>>> If you have further question, just follow up :)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and it's
>>>>>>>> working
>>>>>>>>>>> wonders for keeping the size of the state store in useful
>>>>>> boundaries...
>>>>>>>>>> But
>>>>>>>>>>> Im not 100% clear on how it works.
>>>>>>>>>>>
>>>>>>>>>>> What is implied by the '.until()' clause? What determines when to
>>>>>> stop
>>>>>>>>>>> receiving further data - is it clock time (since the window was
>>>>>>>> created)?
>>>>>>>>>>> It seems problematic for it to refer to EventTime as this may
>>>> bounce
>>>>>>>> all
>>>>>>>>>>> over the place. For non-overlapping windows a given record can
>> only
>>>>>>>> fall
>>>>>>>>>>> into a single aggregation period - so when would a value get
>>>>>> discarded?
>>>>>>>>>>>
>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>>>>>>>> 1000L).until(10 *
>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to