I agree. We got already multiple request to add an API for specifying
topic parameters for internal topic... I am pretty sure we will add it
if time permits -- feel free to contribute this new feature!

About chancing the value of until: that does not work, as the changelog
topic configuration would not be updated.


-Matthias

On 12/14/16 8:22 PM, Sachin Mittal wrote:
> Hi,
> I suggest to include topic config as well as part of streams config
> properties like we do for producer and consumer configs.
> The topic config supplied would be used for creating internal changelog
> topics along with certain additional configs which are applied by default.
> 
> This way we don't have to ever create internal topics manually.
> 
> I had one doubt regarding until.
> Say I specify one value and run my streams app.
> Now I stop the app, specify different value and re start the app.
> 
> Which value for retain would the old (pre existing) windows use. Would it
> be the older value or the new value?
> 
> Thanks
> Sachin
> 
> 
> 
> On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Understood. Makes sense.
>>
>> For this, you should apply Streams configs manually when creating those
>> topics. For retention parameter, use the value you specify in
>> corresponding .until() method for it.
>>
>>
>> -Matthias
>>
>>
>> On 12/14/16 10:08 AM, Sachin Mittal wrote:
>>> I was referring to internal change log topic. I had to create them
>> manually
>>> because in some case the message size of these topic were greater than
>> the
>>> default ones used by kafka streams.
>>>
>>> I think someone in this group recommended to create these topic
>> manually. I
>>> understand that it is better to have internal topics created by streams
>> app
>>> and I will take a second look at these and see if that can be done.
>>>
>>> I just wanted to make sure what all configs are applied to internal
>> topics
>>> in order to decide to avoid them creating manually.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <matth...@confluent.io
>>>
>>> wrote:
>>>
>>>> I am wondering about "I create internal topic manually" -- which topics
>>>> do you refer in detail?
>>>>
>>>> Kafka Streams create all kind of internal topics with auto-generated
>>>> names. So it would be quite tricky to create all of them manually
>>>> (especially because you need to know those name in advance).
>>>>
>>>> IRRC, if a topic does exist, Kafka Streams does no change it's
>>>> configuration. Only if Kafka Streams does create a topic, it will
>>>> specify certain config parameters on topic create step.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
>>>>> Hi,
>>>>> Thanks for the explanation. This illustration makes it super easy to
>>>>> understand how until works. Perhaps we can update the wiki with this
>>>>> illustration.
>>>>> It is basically the retention time for a past window.
>>>>> I used to think until creates all the future windows for that period
>> and
>>>>> when time passes that it used to delete all the past windows. However
>>>>> actually until retains a window for specified time. This makes so much
>>>> more
>>>>> sense.
>>>>>
>>>>> I just had one pending query regarding:
>>>>>
>>>>>> windowstore.changelog.additional.retention.ms
>>>>>
>>>>> How does this relate to rentention.ms param of topic config?
>>>>> I create internal topic manually using say rentention.ms=3600000.
>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>>>> internal changelog topic as well and I want it to be retained for say
>>>> just
>>>>> 1 hour.
>>>>> So how does that above parameter interfere with this topic level
>> setting.
>>>>> Or now I just need to set above config as 3600000 and not add
>>>>> rentention.ms=3600000
>>>>> while creating internal topic.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
>> matth...@confluent.io
>>>>>
>>>>> wrote:
>>>>>
>>>>>> First, windows are only created if there is actual data for a window.
>> So
>>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
>>>>>> falling into each window (btw: window start-time is inclusive while
>>>>>> window end time is exclusive). If you have only 2 record with lets say
>>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each window
>> is
>>>>>> physically created each time the first record for it is processed.
>>>>>>
>>>>>> If you have above 4 windows and a record with ts=101 arrives, a new
>>>>>> window [101,151) will be created. Window [0,50) will not be deleted
>> yet,
>>>>>> because retention is 100 and thus Streams guarantees that all record
>>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
>>>>>> records would fall into window [0,50).
>>>>>>
>>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
>>>>>> not before that.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
>>>>>>> Hi,
>>>>>>> So is until for future or past?
>>>>>>> Say I get first record at t = 0 and until is 100 and my window size
>> is
>>>> 50
>>>>>>> advance by 25.
>>>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
>>>>>>> Now at t = 101 it will drop
>>>>>>> (0, 50), (25, 75), (50, 100) and create
>>>>>>> (101, 150), (125, 175), (150, 200)
>>>>>>>
>>>>>>> Please confirm if this understanding us correct. It is not clear how
>> it
>>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
>>>>>>>
>>>>>>> What case is not clear again is that at say t = 102 I get some
>> message
>>>>>> with
>>>>>>> timestamp 99. What happens then?
>>>>>>> Will the result added to previous aggregation of (50, 100) or (75,
>>>> 125),
>>>>>>> like it should.
>>>>>>>
>>>>>>> Or it will recreate the old window (50, 100) and aggregate the value
>>>>>> there
>>>>>>> and then drop it. This would result is wrong aggregated value, as it
>>>> does
>>>>>>> not consider the previous aggregated values.
>>>>>>>
>>>>>>> So this is the pressing case I am not able to understand. Maybe I am
>>>>>> wrong
>>>>>>> at some basic understanding.
>>>>>>>
>>>>>>>
>>>>>>> Next for
>>>>>>> The parameter
>>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>>
>>>>>>> How does this relate to rentention.ms param of topic config?
>>>>>>> I create internal topic manually using say rentention.ms=3600000.
>>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>>>>>> internal changelog topic as well and I want it to be retained for say
>>>>>> just
>>>>>>> 1 hour.
>>>>>>> So how does that above parameter interfere with this topic level
>>>> setting.
>>>>>>> Or now I just need to set above config as 3600000 and not add
>>>>>>> rentention.ms=3600000
>>>>>>> while creating internal topic.
>>>>>>> This is just another doubt remaining here.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
>>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Sachin,
>>>>>>>>
>>>>>>>> There is no reason to have an .until() AND a .retain() -- just
>>>> increase
>>>>>>>> the value of .until()
>>>>>>>>
>>>>>>>> If you have a window of let's say 1h size and you set .until() also
>> to
>>>>>>>> 1h -- you can obviously not process any late arriving data. If you
>> set
>>>>>>>> until() to 2h is this example, you can process data that is up to 1h
>>>>>>>> delayed.
>>>>>>>>
>>>>>>>> So basically, the retention should always be larger than you window
>>>>>> size.
>>>>>>>>
>>>>>>>> The parameter
>>>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>>>
>>>>>>>> is applies to changelog topics that backup window state stores.
>> Those
>>>>>>>> changelog topics are compacted. However, the used key does encode an
>>>>>>>> window ID and thus older data can never be cleaned up by compaction.
>>>>>>>> Therefore, an additional retention time is applied to those topics,
>>>> too.
>>>>>>>> Thus, if an old window is not updated for this amount of time, it
>> will
>>>>>>>> get deleted eventually preventing this topic to grown infinitely.
>>>>>>>>
>>>>>>>> The value will be determined by until(), i.e., whatever you specify
>> in
>>>>>>>> .until() will be used to set this parameter.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
>>>>>>>>> Hi,
>>>>>>>>> We are facing the exact problem as described by Matthias above.
>>>>>>>>> We are keeping default until which is 1 day.
>>>>>>>>>
>>>>>>>>> Our record's times tamp extractor has a field which increases with
>>>>>> time.
>>>>>>>>> However for short time we cannot guarantee the time stamp is always
>>>>>>>>> increases. So at the boundary ie after 24 hrs we can get records
>>>> which
>>>>>>>> are
>>>>>>>>> beyond that windows retention period.
>>>>>>>>>
>>>>>>>>> Then it happens like it is mentioned above and our aggregation
>> fails.
>>>>>>>>>
>>>>>>>>> So just to sum up when we get record
>>>>>>>>> 24h + 1 sec (it deletes older window and since the new record
>> belongs
>>>>>> to
>>>>>>>>> the new window its gets created)
>>>>>>>>> Now when we get next record of 24 hs - 1 sec since older window is
>>>>>>>> dropped
>>>>>>>>> it does not get aggregated in that bucket.
>>>>>>>>>
>>>>>>>>> I suggest we have another setting next to until call retain which
>>>>>> retains
>>>>>>>>> the older windows into next window.
>>>>>>>>>
>>>>>>>>> I think at stream window boundary level it should use a concept of
>>>>>>>> sliding
>>>>>>>>> window. So we can define window like
>>>>>>>>>
>>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>>>>>>>> 1000l).untill(7
>>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
>>>>>>>>>
>>>>>>>>> So after 7 days it retains the data covered by windows in last 15
>>>>>> minutes
>>>>>>>>> which rolls over the data in them to next window. This way streams
>>>> work
>>>>>>>>> continuously.
>>>>>>>>>
>>>>>>>>> Please let us know your thoughts on this.
>>>>>>>>>
>>>>>>>>> On another side question on this there is a setting:
>>>>>>>>>
>>>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>>>> I is not clear what is does. Is this the default for until?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Sachin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
>>>>>> matth...@confluent.io
>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Windows are created on demand, ie, each time a new record arrives
>>>> and
>>>>>>>>>> there is no window yet for it, a new window will get created.
>>>>>>>>>>
>>>>>>>>>> Windows are accepting data until their retention time (that you
>> can
>>>>>>>>>> configure via .until()) passed. Thus, you will have many windows
>>>> being
>>>>>>>>>> open in parallel.
>>>>>>>>>>
>>>>>>>>>> If you read older data, they will just be put into the
>> corresponding
>>>>>>>>>> windows (as long as window retention time did not pass). If a
>> window
>>>>>> was
>>>>>>>>>> discarded already, a new window with this single (later arriving)
>>>>>> record
>>>>>>>>>> will get created, the computation will be triggered, you get a
>>>> result,
>>>>>>>>>> and afterwards the window is deleted again (as it's retention time
>>>>>>>>>> passed already).
>>>>>>>>>>
>>>>>>>>>> The retention time is driven by "stream-time", in internal tracked
>>>>>> time
>>>>>>>>>> that only progressed in forward direction. It gets it value from
>> the
>>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default it
>>>> will
>>>>>>>>>> be event-time.
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>>>>>>>>>> I've read this and still have more questions than answers. If my
>>>> data
>>>>>>>>>> skips
>>>>>>>>>>> about (timewise) what determines when a given window will start /
>>>>>> stop
>>>>>>>>>>> accepting new data? What if Im reading data from some time ago?
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
>>>>>>>> matth...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Please have a look here:
>>>>>>>>>>>>
>>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>>>>>>>> guide.html#windowing-a-stream
>>>>>>>>>>>>
>>>>>>>>>>>> If you have further question, just follow up :)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and
>> it's
>>>>>>>>>> working
>>>>>>>>>>>>> wonders for keeping the size of the state store in useful
>>>>>>>> boundaries...
>>>>>>>>>>>> But
>>>>>>>>>>>>> Im not 100% clear on how it works.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What is implied by the '.until()' clause? What determines when
>> to
>>>>>>>> stop
>>>>>>>>>>>>> receiving further data - is it clock time (since the window was
>>>>>>>>>> created)?
>>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this may
>>>>>> bounce
>>>>>>>>>> all
>>>>>>>>>>>>> over the place. For non-overlapping windows a given record can
>>>> only
>>>>>>>>>> fall
>>>>>>>>>>>>> into a single aggregation period - so when would a value get
>>>>>>>> discarded?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>>>>>>>>>> 1000L).until(10 *
>>>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to