Hi Sachin, I think we have a way of doing what you want already. If you create a custom state store you can call the enableLogging method and pass in any configuration parameters you want: For example:
final StateStoreSupplier supplier = Stores.create("store") .withKeys(Serdes.String()) .withValues(Serdes.String()) .persistent() .enableLogging(Collections.singletonMap("retention.ms", "1000")) .build(); You can then use the overloaded methods in the DSL to pass in the StateStoreSupplier to your aggregates (trunk only) On Mon, 19 Dec 2016 at 10:58 Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > I am working towards adding topic configs as part of streams config. > However I have run into an issue: > Code flow is like this > > KStreamBuilder builder = new KStreamBuilder(); > builder.stream(...) > ... > KafkaStreams streams = new KafkaStreams(builder, streamsProps); > streams.start(); > > So we can see we build the topology before building the streams. > While building topology it assigns state store. > That time no topic config props are available. > > So it creates the supplier with empty topic config. > > Further StateStoreSupplier has method just to get the config and not to > update it. > Map<String, Object> logConfig() > > One way to implement this is change this interface to be able to update the > log config props too. > And we the props are available to streams we update the topology builder's > state stores too with updated config. > > Other way is to change the KStreamBuilder and make it pass the topic > config. > However in second approach we would be splitting the streams config into > two parts. > > Let me know how should one proceed with this. > > Thanks > Sachin > > > > On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > I agree. We got already multiple request to add an API for specifying > > topic parameters for internal topic... I am pretty sure we will add it > > if time permits -- feel free to contribute this new feature! > > > > About chancing the value of until: that does not work, as the changelog > > topic configuration would not be updated. > > > > > > -Matthias > > > > On 12/14/16 8:22 PM, Sachin Mittal wrote: > > > Hi, > > > I suggest to include topic config as well as part of streams config > > > properties like we do for producer and consumer configs. > > > The topic config supplied would be used for creating internal changelog > > > topics along with certain additional configs which are applied by > > default. > > > > > > This way we don't have to ever create internal topics manually. > > > > > > I had one doubt regarding until. > > > Say I specify one value and run my streams app. > > > Now I stop the app, specify different value and re start the app. > > > > > > Which value for retain would the old (pre existing) windows use. Would > it > > > be the older value or the new value? > > > > > > Thanks > > > Sachin > > > > > > > > > > > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax < > matth...@confluent.io > > > > > > wrote: > > > > > >> Understood. Makes sense. > > >> > > >> For this, you should apply Streams configs manually when creating > those > > >> topics. For retention parameter, use the value you specify in > > >> corresponding .until() method for it. > > >> > > >> > > >> -Matthias > > >> > > >> > > >> On 12/14/16 10:08 AM, Sachin Mittal wrote: > > >>> I was referring to internal change log topic. I had to create them > > >> manually > > >>> because in some case the message size of these topic were greater > than > > >> the > > >>> default ones used by kafka streams. > > >>> > > >>> I think someone in this group recommended to create these topic > > >> manually. I > > >>> understand that it is better to have internal topics created by > streams > > >> app > > >>> and I will take a second look at these and see if that can be done. > > >>> > > >>> I just wanted to make sure what all configs are applied to internal > > >> topics > > >>> in order to decide to avoid them creating manually. > > >>> > > >>> Thanks > > >>> Sachin > > >>> > > >>> > > >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax < > > matth...@confluent.io > > >>> > > >>> wrote: > > >>> > > >>>> I am wondering about "I create internal topic manually" -- which > > topics > > >>>> do you refer in detail? > > >>>> > > >>>> Kafka Streams create all kind of internal topics with auto-generated > > >>>> names. So it would be quite tricky to create all of them manually > > >>>> (especially because you need to know those name in advance). > > >>>> > > >>>> IRRC, if a topic does exist, Kafka Streams does no change it's > > >>>> configuration. Only if Kafka Streams does create a topic, it will > > >>>> specify certain config parameters on topic create step. > > >>>> > > >>>> > > >>>> -Matthias > > >>>> > > >>>> > > >>>> > > >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote: > > >>>>> Hi, > > >>>>> Thanks for the explanation. This illustration makes it super easy > to > > >>>>> understand how until works. Perhaps we can update the wiki with > this > > >>>>> illustration. > > >>>>> It is basically the retention time for a past window. > > >>>>> I used to think until creates all the future windows for that > period > > >> and > > >>>>> when time passes that it used to delete all the past windows. > However > > >>>>> actually until retains a window for specified time. This makes so > > much > > >>>> more > > >>>>> sense. > > >>>>> > > >>>>> I just had one pending query regarding: > > >>>>> > > >>>>>> windowstore.changelog.additional.retention.ms > > >>>>> > > >>>>> How does this relate to rentention.ms param of topic config? > > >>>>> I create internal topic manually using say rentention.ms=3600000. > > >>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete > of > > >>>>> internal changelog topic as well and I want it to be retained for > say > > >>>> just > > >>>>> 1 hour. > > >>>>> So how does that above parameter interfere with this topic level > > >> setting. > > >>>>> Or now I just need to set above config as 3600000 and not add > > >>>>> rentention.ms=3600000 > > >>>>> while creating internal topic. > > >>>>> > > >>>>> Thanks > > >>>>> Sachin > > >>>>> > > >>>>> > > >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax < > > >> matth...@confluent.io > > >>>>> > > >>>>> wrote: > > >>>>> > > >>>>>> First, windows are only created if there is actual data for a > > window. > > >> So > > >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are > > record > > >>>>>> falling into each window (btw: window start-time is inclusive > while > > >>>>>> window end time is exclusive). If you have only 2 record with lets > > say > > >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each > > window > > >> is > > >>>>>> physically created each time the first record for it is processed. > > >>>>>> > > >>>>>> If you have above 4 windows and a record with ts=101 arrives, a > new > > >>>>>> window [101,151) will be created. Window [0,50) will not be > deleted > > >> yet, > > >>>>>> because retention is 100 and thus Streams guarantees that all > record > > >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those > > >>>>>> records would fall into window [0,50). > > >>>>>> > > >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, > > but > > >>>>>> not before that. > > >>>>>> > > >>>>>> -Matthias > > >>>>>> > > >>>>>> > > >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote: > > >>>>>>> Hi, > > >>>>>>> So is until for future or past? > > >>>>>>> Say I get first record at t = 0 and until is 100 and my window > size > > >> is > > >>>> 50 > > >>>>>>> advance by 25. > > >>>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100) > > >>>>>>> Now at t = 101 it will drop > > >>>>>>> (0, 50), (25, 75), (50, 100) and create > > >>>>>>> (101, 150), (125, 175), (150, 200) > > >>>>>>> > > >>>>>>> Please confirm if this understanding us correct. It is not clear > > how > > >> it > > >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so > on? > > >>>>>>> > > >>>>>>> What case is not clear again is that at say t = 102 I get some > > >> message > > >>>>>> with > > >>>>>>> timestamp 99. What happens then? > > >>>>>>> Will the result added to previous aggregation of (50, 100) or > (75, > > >>>> 125), > > >>>>>>> like it should. > > >>>>>>> > > >>>>>>> Or it will recreate the old window (50, 100) and aggregate the > > value > > >>>>>> there > > >>>>>>> and then drop it. This would result is wrong aggregated value, as > > it > > >>>> does > > >>>>>>> not consider the previous aggregated values. > > >>>>>>> > > >>>>>>> So this is the pressing case I am not able to understand. Maybe I > > am > > >>>>>> wrong > > >>>>>>> at some basic understanding. > > >>>>>>> > > >>>>>>> > > >>>>>>> Next for > > >>>>>>> The parameter > > >>>>>>>> windowstore.changelog.additional.retention.ms > > >>>>>>> > > >>>>>>> How does this relate to rentention.ms param of topic config? > > >>>>>>> I create internal topic manually using say rentention.ms > =3600000. > > >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support > delete > > of > > >>>>>>> internal changelog topic as well and I want it to be retained for > > say > > >>>>>> just > > >>>>>>> 1 hour. > > >>>>>>> So how does that above parameter interfere with this topic level > > >>>> setting. > > >>>>>>> Or now I just need to set above config as 3600000 and not add > > >>>>>>> rentention.ms=3600000 > > >>>>>>> while creating internal topic. > > >>>>>>> This is just another doubt remaining here. > > >>>>>>> > > >>>>>>> Thanks > > >>>>>>> Sachin > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax < > > >>>> matth...@confluent.io> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Sachin, > > >>>>>>>> > > >>>>>>>> There is no reason to have an .until() AND a .retain() -- just > > >>>> increase > > >>>>>>>> the value of .until() > > >>>>>>>> > > >>>>>>>> If you have a window of let's say 1h size and you set .until() > > also > > >> to > > >>>>>>>> 1h -- you can obviously not process any late arriving data. If > you > > >> set > > >>>>>>>> until() to 2h is this example, you can process data that is up > to > > 1h > > >>>>>>>> delayed. > > >>>>>>>> > > >>>>>>>> So basically, the retention should always be larger than you > > window > > >>>>>> size. > > >>>>>>>> > > >>>>>>>> The parameter > > >>>>>>>>> windowstore.changelog.additional.retention.ms > > >>>>>>>> > > >>>>>>>> is applies to changelog topics that backup window state stores. > > >> Those > > >>>>>>>> changelog topics are compacted. However, the used key does > encode > > an > > >>>>>>>> window ID and thus older data can never be cleaned up by > > compaction. > > >>>>>>>> Therefore, an additional retention time is applied to those > > topics, > > >>>> too. > > >>>>>>>> Thus, if an old window is not updated for this amount of time, > it > > >> will > > >>>>>>>> get deleted eventually preventing this topic to grown > infinitely. > > >>>>>>>> > > >>>>>>>> The value will be determined by until(), i.e., whatever you > > specify > > >> in > > >>>>>>>> .until() will be used to set this parameter. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> -Matthias > > >>>>>>>> > > >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote: > > >>>>>>>>> Hi, > > >>>>>>>>> We are facing the exact problem as described by Matthias above. > > >>>>>>>>> We are keeping default until which is 1 day. > > >>>>>>>>> > > >>>>>>>>> Our record's times tamp extractor has a field which increases > > with > > >>>>>> time. > > >>>>>>>>> However for short time we cannot guarantee the time stamp is > > always > > >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get > records > > >>>> which > > >>>>>>>> are > > >>>>>>>>> beyond that windows retention period. > > >>>>>>>>> > > >>>>>>>>> Then it happens like it is mentioned above and our aggregation > > >> fails. > > >>>>>>>>> > > >>>>>>>>> So just to sum up when we get record > > >>>>>>>>> 24h + 1 sec (it deletes older window and since the new record > > >> belongs > > >>>>>> to > > >>>>>>>>> the new window its gets created) > > >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older window > > is > > >>>>>>>> dropped > > >>>>>>>>> it does not get aggregated in that bucket. > > >>>>>>>>> > > >>>>>>>>> I suggest we have another setting next to until call retain > which > > >>>>>> retains > > >>>>>>>>> the older windows into next window. > > >>>>>>>>> > > >>>>>>>>> I think at stream window boundary level it should use a concept > > of > > >>>>>>>> sliding > > >>>>>>>>> window. So we can define window like > > >>>>>>>>> > > >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * > > >>>>>>>> 1000l).untill(7 > > >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l) > > >>>>>>>>> > > >>>>>>>>> So after 7 days it retains the data covered by windows in last > 15 > > >>>>>> minutes > > >>>>>>>>> which rolls over the data in them to next window. This way > > streams > > >>>> work > > >>>>>>>>> continuously. > > >>>>>>>>> > > >>>>>>>>> Please let us know your thoughts on this. > > >>>>>>>>> > > >>>>>>>>> On another side question on this there is a setting: > > >>>>>>>>> > > >>>>>>>>> windowstore.changelog.additional.retention.ms > > >>>>>>>>> I is not clear what is does. Is this the default for until? > > >>>>>>>>> > > >>>>>>>>> Thanks > > >>>>>>>>> Sachin > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax < > > >>>>>> matth...@confluent.io > > >>>>>>>>> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Windows are created on demand, ie, each time a new record > > arrives > > >>>> and > > >>>>>>>>>> there is no window yet for it, a new window will get created. > > >>>>>>>>>> > > >>>>>>>>>> Windows are accepting data until their retention time (that > you > > >> can > > >>>>>>>>>> configure via .until()) passed. Thus, you will have many > windows > > >>>> being > > >>>>>>>>>> open in parallel. > > >>>>>>>>>> > > >>>>>>>>>> If you read older data, they will just be put into the > > >> corresponding > > >>>>>>>>>> windows (as long as window retention time did not pass). If a > > >> window > > >>>>>> was > > >>>>>>>>>> discarded already, a new window with this single (later > > arriving) > > >>>>>> record > > >>>>>>>>>> will get created, the computation will be triggered, you get a > > >>>> result, > > >>>>>>>>>> and afterwards the window is deleted again (as it's retention > > time > > >>>>>>>>>> passed already). > > >>>>>>>>>> > > >>>>>>>>>> The retention time is driven by "stream-time", in internal > > tracked > > >>>>>> time > > >>>>>>>>>> that only progressed in forward direction. It gets it value > from > > >> the > > >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default > > it > > >>>> will > > >>>>>>>>>> be event-time. > > >>>>>>>>>> > > >>>>>>>>>> -Matthias > > >>>>>>>>>> > > >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote: > > >>>>>>>>>>> I've read this and still have more questions than answers. If > > my > > >>>> data > > >>>>>>>>>> skips > > >>>>>>>>>>> about (timewise) what determines when a given window will > > start / > > >>>>>> stop > > >>>>>>>>>>> accepting new data? What if Im reading data from some time > ago? > > >>>>>>>>>>> > > >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax < > > >>>>>>>> matth...@confluent.io> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Please have a look here: > > >>>>>>>>>>>> > > >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer- > > >>>>>>>>>>>> guide.html#windowing-a-stream > > >>>>>>>>>>>> > > >>>>>>>>>>>> If you have further question, just follow up :) > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> -Matthias > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote: > > >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps > and > > >> it's > > >>>>>>>>>> working > > >>>>>>>>>>>>> wonders for keeping the size of the state store in useful > > >>>>>>>> boundaries... > > >>>>>>>>>>>> But > > >>>>>>>>>>>>> Im not 100% clear on how it works. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> What is implied by the '.until()' clause? What determines > > when > > >> to > > >>>>>>>> stop > > >>>>>>>>>>>>> receiving further data - is it clock time (since the window > > was > > >>>>>>>>>> created)? > > >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this > may > > >>>>>> bounce > > >>>>>>>>>> all > > >>>>>>>>>>>>> over the place. For non-overlapping windows a given record > > can > > >>>> only > > >>>>>>>>>> fall > > >>>>>>>>>>>>> into a single aggregation period - so when would a value > get > > >>>>>>>> discarded? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * > > >>>>>>>>>>>> 1000L).until(10 * > > >>>>>>>>>>>>> 1000L))' - but what is this accomplishing? > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>>> > > >>> > > >> > > >> > > > > > > > >