I agree. We got already multiple request to add an API for specifying topic parameters for internal topic... I am pretty sure we will add it if time permits -- feel free to contribute this new feature!
About chancing the value of until: that does not work, as the changelog topic configuration would not be updated. -Matthias On 12/14/16 8:22 PM, Sachin Mittal wrote: > Hi, > I suggest to include topic config as well as part of streams config > properties like we do for producer and consumer configs. > The topic config supplied would be used for creating internal changelog > topics along with certain additional configs which are applied by default. > > This way we don't have to ever create internal topics manually. > > I had one doubt regarding until. > Say I specify one value and run my streams app. > Now I stop the app, specify different value and re start the app. > > Which value for retain would the old (pre existing) windows use. Would it > be the older value or the new value? > > Thanks > Sachin > > > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Understood. Makes sense. >> >> For this, you should apply Streams configs manually when creating those >> topics. For retention parameter, use the value you specify in >> corresponding .until() method for it. >> >> >> -Matthias >> >> >> On 12/14/16 10:08 AM, Sachin Mittal wrote: >>> I was referring to internal change log topic. I had to create them >> manually >>> because in some case the message size of these topic were greater than >> the >>> default ones used by kafka streams. >>> >>> I think someone in this group recommended to create these topic >> manually. I >>> understand that it is better to have internal topics created by streams >> app >>> and I will take a second look at these and see if that can be done. >>> >>> I just wanted to make sure what all configs are applied to internal >> topics >>> in order to decide to avoid them creating manually. >>> >>> Thanks >>> Sachin >>> >>> >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <matth...@confluent.io >>> >>> wrote: >>> >>>> I am wondering about "I create internal topic manually" -- which topics >>>> do you refer in detail? >>>> >>>> Kafka Streams create all kind of internal topics with auto-generated >>>> names. So it would be quite tricky to create all of them manually >>>> (especially because you need to know those name in advance). >>>> >>>> IRRC, if a topic does exist, Kafka Streams does no change it's >>>> configuration. Only if Kafka Streams does create a topic, it will >>>> specify certain config parameters on topic create step. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote: >>>>> Hi, >>>>> Thanks for the explanation. This illustration makes it super easy to >>>>> understand how until works. Perhaps we can update the wiki with this >>>>> illustration. >>>>> It is basically the retention time for a past window. >>>>> I used to think until creates all the future windows for that period >> and >>>>> when time passes that it used to delete all the past windows. However >>>>> actually until retains a window for specified time. This makes so much >>>> more >>>>> sense. >>>>> >>>>> I just had one pending query regarding: >>>>> >>>>>> windowstore.changelog.additional.retention.ms >>>>> >>>>> How does this relate to rentention.ms param of topic config? >>>>> I create internal topic manually using say rentention.ms=3600000. >>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of >>>>> internal changelog topic as well and I want it to be retained for say >>>> just >>>>> 1 hour. >>>>> So how does that above parameter interfere with this topic level >> setting. >>>>> Or now I just need to set above config as 3600000 and not add >>>>> rentention.ms=3600000 >>>>> while creating internal topic. >>>>> >>>>> Thanks >>>>> Sachin >>>>> >>>>> >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax < >> matth...@confluent.io >>>>> >>>>> wrote: >>>>> >>>>>> First, windows are only created if there is actual data for a window. >> So >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are record >>>>>> falling into each window (btw: window start-time is inclusive while >>>>>> window end time is exclusive). If you have only 2 record with lets say >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each window >> is >>>>>> physically created each time the first record for it is processed. >>>>>> >>>>>> If you have above 4 windows and a record with ts=101 arrives, a new >>>>>> window [101,151) will be created. Window [0,50) will not be deleted >> yet, >>>>>> because retention is 100 and thus Streams guarantees that all record >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those >>>>>> records would fall into window [0,50). >>>>>> >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but >>>>>> not before that. >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote: >>>>>>> Hi, >>>>>>> So is until for future or past? >>>>>>> Say I get first record at t = 0 and until is 100 and my window size >> is >>>> 50 >>>>>>> advance by 25. >>>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100) >>>>>>> Now at t = 101 it will drop >>>>>>> (0, 50), (25, 75), (50, 100) and create >>>>>>> (101, 150), (125, 175), (150, 200) >>>>>>> >>>>>>> Please confirm if this understanding us correct. It is not clear how >> it >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so on? >>>>>>> >>>>>>> What case is not clear again is that at say t = 102 I get some >> message >>>>>> with >>>>>>> timestamp 99. What happens then? >>>>>>> Will the result added to previous aggregation of (50, 100) or (75, >>>> 125), >>>>>>> like it should. >>>>>>> >>>>>>> Or it will recreate the old window (50, 100) and aggregate the value >>>>>> there >>>>>>> and then drop it. This would result is wrong aggregated value, as it >>>> does >>>>>>> not consider the previous aggregated values. >>>>>>> >>>>>>> So this is the pressing case I am not able to understand. Maybe I am >>>>>> wrong >>>>>>> at some basic understanding. >>>>>>> >>>>>>> >>>>>>> Next for >>>>>>> The parameter >>>>>>>> windowstore.changelog.additional.retention.ms >>>>>>> >>>>>>> How does this relate to rentention.ms param of topic config? >>>>>>> I create internal topic manually using say rentention.ms=3600000. >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of >>>>>>> internal changelog topic as well and I want it to be retained for say >>>>>> just >>>>>>> 1 hour. >>>>>>> So how does that above parameter interfere with this topic level >>>> setting. >>>>>>> Or now I just need to set above config as 3600000 and not add >>>>>>> rentention.ms=3600000 >>>>>>> while creating internal topic. >>>>>>> This is just another doubt remaining here. >>>>>>> >>>>>>> Thanks >>>>>>> Sachin >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax < >>>> matth...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Sachin, >>>>>>>> >>>>>>>> There is no reason to have an .until() AND a .retain() -- just >>>> increase >>>>>>>> the value of .until() >>>>>>>> >>>>>>>> If you have a window of let's say 1h size and you set .until() also >> to >>>>>>>> 1h -- you can obviously not process any late arriving data. If you >> set >>>>>>>> until() to 2h is this example, you can process data that is up to 1h >>>>>>>> delayed. >>>>>>>> >>>>>>>> So basically, the retention should always be larger than you window >>>>>> size. >>>>>>>> >>>>>>>> The parameter >>>>>>>>> windowstore.changelog.additional.retention.ms >>>>>>>> >>>>>>>> is applies to changelog topics that backup window state stores. >> Those >>>>>>>> changelog topics are compacted. However, the used key does encode an >>>>>>>> window ID and thus older data can never be cleaned up by compaction. >>>>>>>> Therefore, an additional retention time is applied to those topics, >>>> too. >>>>>>>> Thus, if an old window is not updated for this amount of time, it >> will >>>>>>>> get deleted eventually preventing this topic to grown infinitely. >>>>>>>> >>>>>>>> The value will be determined by until(), i.e., whatever you specify >> in >>>>>>>> .until() will be used to set this parameter. >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote: >>>>>>>>> Hi, >>>>>>>>> We are facing the exact problem as described by Matthias above. >>>>>>>>> We are keeping default until which is 1 day. >>>>>>>>> >>>>>>>>> Our record's times tamp extractor has a field which increases with >>>>>> time. >>>>>>>>> However for short time we cannot guarantee the time stamp is always >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get records >>>> which >>>>>>>> are >>>>>>>>> beyond that windows retention period. >>>>>>>>> >>>>>>>>> Then it happens like it is mentioned above and our aggregation >> fails. >>>>>>>>> >>>>>>>>> So just to sum up when we get record >>>>>>>>> 24h + 1 sec (it deletes older window and since the new record >> belongs >>>>>> to >>>>>>>>> the new window its gets created) >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older window is >>>>>>>> dropped >>>>>>>>> it does not get aggregated in that bucket. >>>>>>>>> >>>>>>>>> I suggest we have another setting next to until call retain which >>>>>> retains >>>>>>>>> the older windows into next window. >>>>>>>>> >>>>>>>>> I think at stream window boundary level it should use a concept of >>>>>>>> sliding >>>>>>>>> window. So we can define window like >>>>>>>>> >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * >>>>>>>> 1000l).untill(7 >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l) >>>>>>>>> >>>>>>>>> So after 7 days it retains the data covered by windows in last 15 >>>>>> minutes >>>>>>>>> which rolls over the data in them to next window. This way streams >>>> work >>>>>>>>> continuously. >>>>>>>>> >>>>>>>>> Please let us know your thoughts on this. >>>>>>>>> >>>>>>>>> On another side question on this there is a setting: >>>>>>>>> >>>>>>>>> windowstore.changelog.additional.retention.ms >>>>>>>>> I is not clear what is does. Is this the default for until? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Sachin >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax < >>>>>> matth...@confluent.io >>>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Windows are created on demand, ie, each time a new record arrives >>>> and >>>>>>>>>> there is no window yet for it, a new window will get created. >>>>>>>>>> >>>>>>>>>> Windows are accepting data until their retention time (that you >> can >>>>>>>>>> configure via .until()) passed. Thus, you will have many windows >>>> being >>>>>>>>>> open in parallel. >>>>>>>>>> >>>>>>>>>> If you read older data, they will just be put into the >> corresponding >>>>>>>>>> windows (as long as window retention time did not pass). If a >> window >>>>>> was >>>>>>>>>> discarded already, a new window with this single (later arriving) >>>>>> record >>>>>>>>>> will get created, the computation will be triggered, you get a >>>> result, >>>>>>>>>> and afterwards the window is deleted again (as it's retention time >>>>>>>>>> passed already). >>>>>>>>>> >>>>>>>>>> The retention time is driven by "stream-time", in internal tracked >>>>>> time >>>>>>>>>> that only progressed in forward direction. It gets it value from >> the >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default it >>>> will >>>>>>>>>> be event-time. >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote: >>>>>>>>>>> I've read this and still have more questions than answers. If my >>>> data >>>>>>>>>> skips >>>>>>>>>>> about (timewise) what determines when a given window will start / >>>>>> stop >>>>>>>>>>> accepting new data? What if Im reading data from some time ago? >>>>>>>>>>> >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax < >>>>>>>> matth...@confluent.io> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Please have a look here: >>>>>>>>>>>> >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer- >>>>>>>>>>>> guide.html#windowing-a-stream >>>>>>>>>>>> >>>>>>>>>>>> If you have further question, just follow up :) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote: >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and >> it's >>>>>>>>>> working >>>>>>>>>>>>> wonders for keeping the size of the state store in useful >>>>>>>> boundaries... >>>>>>>>>>>> But >>>>>>>>>>>>> Im not 100% clear on how it works. >>>>>>>>>>>>> >>>>>>>>>>>>> What is implied by the '.until()' clause? What determines when >> to >>>>>>>> stop >>>>>>>>>>>>> receiving further data - is it clock time (since the window was >>>>>>>>>> created)? >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this may >>>>>> bounce >>>>>>>>>> all >>>>>>>>>>>>> over the place. For non-overlapping windows a given record can >>>> only >>>>>>>>>> fall >>>>>>>>>>>>> into a single aggregation period - so when would a value get >>>>>>>> discarded? >>>>>>>>>>>>> >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * >>>>>>>>>>>> 1000L).until(10 * >>>>>>>>>>>>> 1000L))' - but what is this accomplishing? >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature