Hi Sachin,

I think we have a way of doing what you want already. If you create a
custom state store you can call the enableLogging method and pass in any
configuration parameters you want: For example:

final StateStoreSupplier supplier = Stores.create("store")
        .withKeys(Serdes.String())
        .withValues(Serdes.String())
        .persistent()
        .enableLogging(Collections.singletonMap("retention.ms", "1000"))
        .build();

You can then use the overloaded methods in the DSL to pass in the
StateStoreSupplier to your aggregates (trunk only)


On Mon, 19 Dec 2016 at 10:58 Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I am working towards adding topic configs as part of streams config.
> However I have run into an issue:
> Code flow is like this
>
> KStreamBuilder builder = new KStreamBuilder();
> builder.stream(...)
> ...
> KafkaStreams streams = new KafkaStreams(builder, streamsProps);
> streams.start();
>
> So we can see we build the topology before building the streams.
> While building topology it assigns state store.
> That time no topic config props are available.
>
> So it creates the supplier with empty topic config.
>
> Further StateStoreSupplier has method just to get the config and not to
> update it.
> Map<String, Object> logConfig()
>
> One way to implement this is change this interface to be able to update the
> log config props too.
> And we the props are available to streams we update the topology builder's
> state stores too with updated config.
>
> Other way is to change the KStreamBuilder and make it pass the topic
> config.
> However in second approach we would be splitting the streams config into
> two parts.
>
> Let me know how should one proceed with this.
>
> Thanks
> Sachin
>
>
>
> On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > I agree. We got already multiple request to add an API for specifying
> > topic parameters for internal topic... I am pretty sure we will add it
> > if time permits -- feel free to contribute this new feature!
> >
> > About chancing the value of until: that does not work, as the changelog
> > topic configuration would not be updated.
> >
> >
> > -Matthias
> >
> > On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > > Hi,
> > > I suggest to include topic config as well as part of streams config
> > > properties like we do for producer and consumer configs.
> > > The topic config supplied would be used for creating internal changelog
> > > topics along with certain additional configs which are applied by
> > default.
> > >
> > > This way we don't have to ever create internal topics manually.
> > >
> > > I had one doubt regarding until.
> > > Say I specify one value and run my streams app.
> > > Now I stop the app, specify different value and re start the app.
> > >
> > > Which value for retain would the old (pre existing) windows use. Would
> it
> > > be the older value or the new value?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > wrote:
> > >
> > >> Understood. Makes sense.
> > >>
> > >> For this, you should apply Streams configs manually when creating
> those
> > >> topics. For retention parameter, use the value you specify in
> > >> corresponding .until() method for it.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > >>> I was referring to internal change log topic. I had to create them
> > >> manually
> > >>> because in some case the message size of these topic were greater
> than
> > >> the
> > >>> default ones used by kafka streams.
> > >>>
> > >>> I think someone in this group recommended to create these topic
> > >> manually. I
> > >>> understand that it is better to have internal topics created by
> streams
> > >> app
> > >>> and I will take a second look at these and see if that can be done.
> > >>>
> > >>> I just wanted to make sure what all configs are applied to internal
> > >> topics
> > >>> in order to decide to avoid them creating manually.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>>
> > >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <
> > matth...@confluent.io
> > >>>
> > >>> wrote:
> > >>>
> > >>>> I am wondering about "I create internal topic manually" -- which
> > topics
> > >>>> do you refer in detail?
> > >>>>
> > >>>> Kafka Streams create all kind of internal topics with auto-generated
> > >>>> names. So it would be quite tricky to create all of them manually
> > >>>> (especially because you need to know those name in advance).
> > >>>>
> > >>>> IRRC, if a topic does exist, Kafka Streams does no change it's
> > >>>> configuration. Only if Kafka Streams does create a topic, it will
> > >>>> specify certain config parameters on topic create step.
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>>
> > >>>>
> > >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> > >>>>> Hi,
> > >>>>> Thanks for the explanation. This illustration makes it super easy
> to
> > >>>>> understand how until works. Perhaps we can update the wiki with
> this
> > >>>>> illustration.
> > >>>>> It is basically the retention time for a past window.
> > >>>>> I used to think until creates all the future windows for that
> period
> > >> and
> > >>>>> when time passes that it used to delete all the past windows.
> However
> > >>>>> actually until retains a window for specified time. This makes so
> > much
> > >>>> more
> > >>>>> sense.
> > >>>>>
> > >>>>> I just had one pending query regarding:
> > >>>>>
> > >>>>>> windowstore.changelog.additional.retention.ms
> > >>>>>
> > >>>>> How does this relate to rentention.ms param of topic config?
> > >>>>> I create internal topic manually using say rentention.ms=3600000.
> > >>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete
> of
> > >>>>> internal changelog topic as well and I want it to be retained for
> say
> > >>>> just
> > >>>>> 1 hour.
> > >>>>> So how does that above parameter interfere with this topic level
> > >> setting.
> > >>>>> Or now I just need to set above config as 3600000 and not add
> > >>>>> rentention.ms=3600000
> > >>>>> while creating internal topic.
> > >>>>>
> > >>>>> Thanks
> > >>>>> Sachin
> > >>>>>
> > >>>>>
> > >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
> > >> matth...@confluent.io
> > >>>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> First, windows are only created if there is actual data for a
> > window.
> > >> So
> > >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are
> > record
> > >>>>>> falling into each window (btw: window start-time is inclusive
> while
> > >>>>>> window end time is exclusive). If you have only 2 record with lets
> > say
> > >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each
> > window
> > >> is
> > >>>>>> physically created each time the first record for it is processed.
> > >>>>>>
> > >>>>>> If you have above 4 windows and a record with ts=101 arrives, a
> new
> > >>>>>> window [101,151) will be created. Window [0,50) will not be
> deleted
> > >> yet,
> > >>>>>> because retention is 100 and thus Streams guarantees that all
> record
> > >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
> > >>>>>> records would fall into window [0,50).
> > >>>>>>
> > >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150,
> > but
> > >>>>>> not before that.
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>>
> > >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > >>>>>>> Hi,
> > >>>>>>> So is until for future or past?
> > >>>>>>> Say I get first record at t = 0 and until is 100 and my window
> size
> > >> is
> > >>>> 50
> > >>>>>>> advance by 25.
> > >>>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
> > >>>>>>> Now at t = 101 it will drop
> > >>>>>>> (0, 50), (25, 75), (50, 100) and create
> > >>>>>>> (101, 150), (125, 175), (150, 200)
> > >>>>>>>
> > >>>>>>> Please confirm if this understanding us correct. It is not clear
> > how
> > >> it
> > >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so
> on?
> > >>>>>>>
> > >>>>>>> What case is not clear again is that at say t = 102 I get some
> > >> message
> > >>>>>> with
> > >>>>>>> timestamp 99. What happens then?
> > >>>>>>> Will the result added to previous aggregation of (50, 100) or
> (75,
> > >>>> 125),
> > >>>>>>> like it should.
> > >>>>>>>
> > >>>>>>> Or it will recreate the old window (50, 100) and aggregate the
> > value
> > >>>>>> there
> > >>>>>>> and then drop it. This would result is wrong aggregated value, as
> > it
> > >>>> does
> > >>>>>>> not consider the previous aggregated values.
> > >>>>>>>
> > >>>>>>> So this is the pressing case I am not able to understand. Maybe I
> > am
> > >>>>>> wrong
> > >>>>>>> at some basic understanding.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Next for
> > >>>>>>> The parameter
> > >>>>>>>> windowstore.changelog.additional.retention.ms
> > >>>>>>>
> > >>>>>>> How does this relate to rentention.ms param of topic config?
> > >>>>>>> I create internal topic manually using say rentention.ms
> =3600000.
> > >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support
> delete
> > of
> > >>>>>>> internal changelog topic as well and I want it to be retained for
> > say
> > >>>>>> just
> > >>>>>>> 1 hour.
> > >>>>>>> So how does that above parameter interfere with this topic level
> > >>>> setting.
> > >>>>>>> Or now I just need to set above config as 3600000 and not add
> > >>>>>>> rentention.ms=3600000
> > >>>>>>> while creating internal topic.
> > >>>>>>> This is just another doubt remaining here.
> > >>>>>>>
> > >>>>>>> Thanks
> > >>>>>>> Sachin
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
> > >>>> matth...@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Sachin,
> > >>>>>>>>
> > >>>>>>>> There is no reason to have an .until() AND a .retain() -- just
> > >>>> increase
> > >>>>>>>> the value of .until()
> > >>>>>>>>
> > >>>>>>>> If you have a window of let's say 1h size and you set .until()
> > also
> > >> to
> > >>>>>>>> 1h -- you can obviously not process any late arriving data. If
> you
> > >> set
> > >>>>>>>> until() to 2h is this example, you can process data that is up
> to
> > 1h
> > >>>>>>>> delayed.
> > >>>>>>>>
> > >>>>>>>> So basically, the retention should always be larger than you
> > window
> > >>>>>> size.
> > >>>>>>>>
> > >>>>>>>> The parameter
> > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > >>>>>>>>
> > >>>>>>>> is applies to changelog topics that backup window state stores.
> > >> Those
> > >>>>>>>> changelog topics are compacted. However, the used key does
> encode
> > an
> > >>>>>>>> window ID and thus older data can never be cleaned up by
> > compaction.
> > >>>>>>>> Therefore, an additional retention time is applied to those
> > topics,
> > >>>> too.
> > >>>>>>>> Thus, if an old window is not updated for this amount of time,
> it
> > >> will
> > >>>>>>>> get deleted eventually preventing this topic to grown
> infinitely.
> > >>>>>>>>
> > >>>>>>>> The value will be determined by until(), i.e., whatever you
> > specify
> > >> in
> > >>>>>>>> .until() will be used to set this parameter.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> -Matthias
> > >>>>>>>>
> > >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> > >>>>>>>>> Hi,
> > >>>>>>>>> We are facing the exact problem as described by Matthias above.
> > >>>>>>>>> We are keeping default until which is 1 day.
> > >>>>>>>>>
> > >>>>>>>>> Our record's times tamp extractor has a field which increases
> > with
> > >>>>>> time.
> > >>>>>>>>> However for short time we cannot guarantee the time stamp is
> > always
> > >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get
> records
> > >>>> which
> > >>>>>>>> are
> > >>>>>>>>> beyond that windows retention period.
> > >>>>>>>>>
> > >>>>>>>>> Then it happens like it is mentioned above and our aggregation
> > >> fails.
> > >>>>>>>>>
> > >>>>>>>>> So just to sum up when we get record
> > >>>>>>>>> 24h + 1 sec (it deletes older window and since the new record
> > >> belongs
> > >>>>>> to
> > >>>>>>>>> the new window its gets created)
> > >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older window
> > is
> > >>>>>>>> dropped
> > >>>>>>>>> it does not get aggregated in that bucket.
> > >>>>>>>>>
> > >>>>>>>>> I suggest we have another setting next to until call retain
> which
> > >>>>>> retains
> > >>>>>>>>> the older windows into next window.
> > >>>>>>>>>
> > >>>>>>>>> I think at stream window boundary level it should use a concept
> > of
> > >>>>>>>> sliding
> > >>>>>>>>> window. So we can define window like
> > >>>>>>>>>
> > >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> > >>>>>>>> 1000l).untill(7
> > >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> > >>>>>>>>>
> > >>>>>>>>> So after 7 days it retains the data covered by windows in last
> 15
> > >>>>>> minutes
> > >>>>>>>>> which rolls over the data in them to next window. This way
> > streams
> > >>>> work
> > >>>>>>>>> continuously.
> > >>>>>>>>>
> > >>>>>>>>> Please let us know your thoughts on this.
> > >>>>>>>>>
> > >>>>>>>>> On another side question on this there is a setting:
> > >>>>>>>>>
> > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > >>>>>>>>> I is not clear what is does. Is this the default for until?
> > >>>>>>>>>
> > >>>>>>>>> Thanks
> > >>>>>>>>> Sachin
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> > >>>>>> matth...@confluent.io
> > >>>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Windows are created on demand, ie, each time a new record
> > arrives
> > >>>> and
> > >>>>>>>>>> there is no window yet for it, a new window will get created.
> > >>>>>>>>>>
> > >>>>>>>>>> Windows are accepting data until their retention time (that
> you
> > >> can
> > >>>>>>>>>> configure via .until()) passed. Thus, you will have many
> windows
> > >>>> being
> > >>>>>>>>>> open in parallel.
> > >>>>>>>>>>
> > >>>>>>>>>> If you read older data, they will just be put into the
> > >> corresponding
> > >>>>>>>>>> windows (as long as window retention time did not pass). If a
> > >> window
> > >>>>>> was
> > >>>>>>>>>> discarded already, a new window with this single (later
> > arriving)
> > >>>>>> record
> > >>>>>>>>>> will get created, the computation will be triggered, you get a
> > >>>> result,
> > >>>>>>>>>> and afterwards the window is deleted again (as it's retention
> > time
> > >>>>>>>>>> passed already).
> > >>>>>>>>>>
> > >>>>>>>>>> The retention time is driven by "stream-time", in internal
> > tracked
> > >>>>>> time
> > >>>>>>>>>> that only progressed in forward direction. It gets it value
> from
> > >> the
> > >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default
> > it
> > >>>> will
> > >>>>>>>>>> be event-time.
> > >>>>>>>>>>
> > >>>>>>>>>> -Matthias
> > >>>>>>>>>>
> > >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> > >>>>>>>>>>> I've read this and still have more questions than answers. If
> > my
> > >>>> data
> > >>>>>>>>>> skips
> > >>>>>>>>>>> about (timewise) what determines when a given window will
> > start /
> > >>>>>> stop
> > >>>>>>>>>>> accepting new data? What if Im reading data from some time
> ago?
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> > >>>>>>>> matth...@confluent.io>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Please have a look here:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> > >>>>>>>>>>>> guide.html#windowing-a-stream
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> If you have further question, just follow up :)
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> > >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps
> and
> > >> it's
> > >>>>>>>>>> working
> > >>>>>>>>>>>>> wonders for keeping the size of the state store in useful
> > >>>>>>>> boundaries...
> > >>>>>>>>>>>> But
> > >>>>>>>>>>>>> Im not 100% clear on how it works.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> What is implied by the '.until()' clause? What determines
> > when
> > >> to
> > >>>>>>>> stop
> > >>>>>>>>>>>>> receiving further data - is it clock time (since the window
> > was
> > >>>>>>>>>> created)?
> > >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this
> may
> > >>>>>> bounce
> > >>>>>>>>>> all
> > >>>>>>>>>>>>> over the place. For non-overlapping windows a given record
> > can
> > >>>> only
> > >>>>>>>>>> fall
> > >>>>>>>>>>>>> into a single aggregation period - so when would a value
> get
> > >>>>>>>> discarded?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> > >>>>>>>>>>>> 1000L).until(10 *
> > >>>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>

Reply via email to