Hi,
I am working towards adding topic configs as part of streams config.
However I have run into an issue:
Code flow is like this

KStreamBuilder builder = new KStreamBuilder();
builder.stream(...)
...
KafkaStreams streams = new KafkaStreams(builder, streamsProps);
streams.start();

So we can see we build the topology before building the streams.
While building topology it assigns state store.
That time no topic config props are available.

So it creates the supplier with empty topic config.

Further StateStoreSupplier has method just to get the config and not to
update it.
Map<String, Object> logConfig()

One way to implement this is change this interface to be able to update the
log config props too.
And we the props are available to streams we update the topology builder's
state stores too with updated config.

Other way is to change the KStreamBuilder and make it pass the topic config.
However in second approach we would be splitting the streams config into
two parts.

Let me know how should one proceed with this.

Thanks
Sachin



On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> I agree. We got already multiple request to add an API for specifying
> topic parameters for internal topic... I am pretty sure we will add it
> if time permits -- feel free to contribute this new feature!
>
> About chancing the value of until: that does not work, as the changelog
> topic configuration would not be updated.
>
>
> -Matthias
>
> On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > Hi,
> > I suggest to include topic config as well as part of streams config
> > properties like we do for producer and consumer configs.
> > The topic config supplied would be used for creating internal changelog
> > topics along with certain additional configs which are applied by
> default.
> >
> > This way we don't have to ever create internal topics manually.
> >
> > I had one doubt regarding until.
> > Say I specify one value and run my streams app.
> > Now I stop the app, specify different value and re start the app.
> >
> > Which value for retain would the old (pre existing) windows use. Would it
> > be the older value or the new value?
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <matth...@confluent.io
> >
> > wrote:
> >
> >> Understood. Makes sense.
> >>
> >> For this, you should apply Streams configs manually when creating those
> >> topics. For retention parameter, use the value you specify in
> >> corresponding .until() method for it.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> >>> I was referring to internal change log topic. I had to create them
> >> manually
> >>> because in some case the message size of these topic were greater than
> >> the
> >>> default ones used by kafka streams.
> >>>
> >>> I think someone in this group recommended to create these topic
> >> manually. I
> >>> understand that it is better to have internal topics created by streams
> >> app
> >>> and I will take a second look at these and see if that can be done.
> >>>
> >>> I just wanted to make sure what all configs are applied to internal
> >> topics
> >>> in order to decide to avoid them creating manually.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <
> matth...@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> I am wondering about "I create internal topic manually" -- which
> topics
> >>>> do you refer in detail?
> >>>>
> >>>> Kafka Streams create all kind of internal topics with auto-generated
> >>>> names. So it would be quite tricky to create all of them manually
> >>>> (especially because you need to know those name in advance).
> >>>>
> >>>> IRRC, if a topic does exist, Kafka Streams does no change it's
> >>>> configuration. Only if Kafka Streams does create a topic, it will
> >>>> specify certain config parameters on topic create step.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> >>>>> Hi,
> >>>>> Thanks for the explanation. This illustration makes it super easy to
> >>>>> understand how until works. Perhaps we can update the wiki with this
> >>>>> illustration.
> >>>>> It is basically the retention time for a past window.
> >>>>> I used to think until creates all the future windows for that period
> >> and
> >>>>> when time passes that it used to delete all the past windows. However
> >>>>> actually until retains a window for specified time. This makes so
> much
> >>>> more
> >>>>> sense.
> >>>>>
> >>>>> I just had one pending query regarding:
> >>>>>
> >>>>>> windowstore.changelog.additional.retention.ms
> >>>>>
> >>>>> How does this relate to rentention.ms param of topic config?
> >>>>> I create internal topic manually using say rentention.ms=3600000.
> >>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> >>>>> internal changelog topic as well and I want it to be retained for say
> >>>> just
> >>>>> 1 hour.
> >>>>> So how does that above parameter interfere with this topic level
> >> setting.
> >>>>> Or now I just need to set above config as 3600000 and not add
> >>>>> rentention.ms=3600000
> >>>>> while creating internal topic.
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>>
> >>>>>
> >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
> >> matth...@confluent.io
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> First, windows are only created if there is actual data for a
> window.
> >> So
> >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are
> record
> >>>>>> falling into each window (btw: window start-time is inclusive while
> >>>>>> window end time is exclusive). If you have only 2 record with lets
> say
> >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each
> window
> >> is
> >>>>>> physically created each time the first record for it is processed.
> >>>>>>
> >>>>>> If you have above 4 windows and a record with ts=101 arrives, a new
> >>>>>> window [101,151) will be created. Window [0,50) will not be deleted
> >> yet,
> >>>>>> because retention is 100 and thus Streams guarantees that all record
> >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
> >>>>>> records would fall into window [0,50).
> >>>>>>
> >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150,
> but
> >>>>>> not before that.
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> >>>>>>> Hi,
> >>>>>>> So is until for future or past?
> >>>>>>> Say I get first record at t = 0 and until is 100 and my window size
> >> is
> >>>> 50
> >>>>>>> advance by 25.
> >>>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
> >>>>>>> Now at t = 101 it will drop
> >>>>>>> (0, 50), (25, 75), (50, 100) and create
> >>>>>>> (101, 150), (125, 175), (150, 200)
> >>>>>>>
> >>>>>>> Please confirm if this understanding us correct. It is not clear
> how
> >> it
> >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
> >>>>>>>
> >>>>>>> What case is not clear again is that at say t = 102 I get some
> >> message
> >>>>>> with
> >>>>>>> timestamp 99. What happens then?
> >>>>>>> Will the result added to previous aggregation of (50, 100) or (75,
> >>>> 125),
> >>>>>>> like it should.
> >>>>>>>
> >>>>>>> Or it will recreate the old window (50, 100) and aggregate the
> value
> >>>>>> there
> >>>>>>> and then drop it. This would result is wrong aggregated value, as
> it
> >>>> does
> >>>>>>> not consider the previous aggregated values.
> >>>>>>>
> >>>>>>> So this is the pressing case I am not able to understand. Maybe I
> am
> >>>>>> wrong
> >>>>>>> at some basic understanding.
> >>>>>>>
> >>>>>>>
> >>>>>>> Next for
> >>>>>>> The parameter
> >>>>>>>> windowstore.changelog.additional.retention.ms
> >>>>>>>
> >>>>>>> How does this relate to rentention.ms param of topic config?
> >>>>>>> I create internal topic manually using say rentention.ms=3600000.
> >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete
> of
> >>>>>>> internal changelog topic as well and I want it to be retained for
> say
> >>>>>> just
> >>>>>>> 1 hour.
> >>>>>>> So how does that above parameter interfere with this topic level
> >>>> setting.
> >>>>>>> Or now I just need to set above config as 3600000 and not add
> >>>>>>> rentention.ms=3600000
> >>>>>>> while creating internal topic.
> >>>>>>> This is just another doubt remaining here.
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Sachin
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
> >>>> matth...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Sachin,
> >>>>>>>>
> >>>>>>>> There is no reason to have an .until() AND a .retain() -- just
> >>>> increase
> >>>>>>>> the value of .until()
> >>>>>>>>
> >>>>>>>> If you have a window of let's say 1h size and you set .until()
> also
> >> to
> >>>>>>>> 1h -- you can obviously not process any late arriving data. If you
> >> set
> >>>>>>>> until() to 2h is this example, you can process data that is up to
> 1h
> >>>>>>>> delayed.
> >>>>>>>>
> >>>>>>>> So basically, the retention should always be larger than you
> window
> >>>>>> size.
> >>>>>>>>
> >>>>>>>> The parameter
> >>>>>>>>> windowstore.changelog.additional.retention.ms
> >>>>>>>>
> >>>>>>>> is applies to changelog topics that backup window state stores.
> >> Those
> >>>>>>>> changelog topics are compacted. However, the used key does encode
> an
> >>>>>>>> window ID and thus older data can never be cleaned up by
> compaction.
> >>>>>>>> Therefore, an additional retention time is applied to those
> topics,
> >>>> too.
> >>>>>>>> Thus, if an old window is not updated for this amount of time, it
> >> will
> >>>>>>>> get deleted eventually preventing this topic to grown infinitely.
> >>>>>>>>
> >>>>>>>> The value will be determined by until(), i.e., whatever you
> specify
> >> in
> >>>>>>>> .until() will be used to set this parameter.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> >>>>>>>>> Hi,
> >>>>>>>>> We are facing the exact problem as described by Matthias above.
> >>>>>>>>> We are keeping default until which is 1 day.
> >>>>>>>>>
> >>>>>>>>> Our record's times tamp extractor has a field which increases
> with
> >>>>>> time.
> >>>>>>>>> However for short time we cannot guarantee the time stamp is
> always
> >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get records
> >>>> which
> >>>>>>>> are
> >>>>>>>>> beyond that windows retention period.
> >>>>>>>>>
> >>>>>>>>> Then it happens like it is mentioned above and our aggregation
> >> fails.
> >>>>>>>>>
> >>>>>>>>> So just to sum up when we get record
> >>>>>>>>> 24h + 1 sec (it deletes older window and since the new record
> >> belongs
> >>>>>> to
> >>>>>>>>> the new window its gets created)
> >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older window
> is
> >>>>>>>> dropped
> >>>>>>>>> it does not get aggregated in that bucket.
> >>>>>>>>>
> >>>>>>>>> I suggest we have another setting next to until call retain which
> >>>>>> retains
> >>>>>>>>> the older windows into next window.
> >>>>>>>>>
> >>>>>>>>> I think at stream window boundary level it should use a concept
> of
> >>>>>>>> sliding
> >>>>>>>>> window. So we can define window like
> >>>>>>>>>
> >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> >>>>>>>> 1000l).untill(7
> >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> >>>>>>>>>
> >>>>>>>>> So after 7 days it retains the data covered by windows in last 15
> >>>>>> minutes
> >>>>>>>>> which rolls over the data in them to next window. This way
> streams
> >>>> work
> >>>>>>>>> continuously.
> >>>>>>>>>
> >>>>>>>>> Please let us know your thoughts on this.
> >>>>>>>>>
> >>>>>>>>> On another side question on this there is a setting:
> >>>>>>>>>
> >>>>>>>>> windowstore.changelog.additional.retention.ms
> >>>>>>>>> I is not clear what is does. Is this the default for until?
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Sachin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> >>>>>> matth...@confluent.io
> >>>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Windows are created on demand, ie, each time a new record
> arrives
> >>>> and
> >>>>>>>>>> there is no window yet for it, a new window will get created.
> >>>>>>>>>>
> >>>>>>>>>> Windows are accepting data until their retention time (that you
> >> can
> >>>>>>>>>> configure via .until()) passed. Thus, you will have many windows
> >>>> being
> >>>>>>>>>> open in parallel.
> >>>>>>>>>>
> >>>>>>>>>> If you read older data, they will just be put into the
> >> corresponding
> >>>>>>>>>> windows (as long as window retention time did not pass). If a
> >> window
> >>>>>> was
> >>>>>>>>>> discarded already, a new window with this single (later
> arriving)
> >>>>>> record
> >>>>>>>>>> will get created, the computation will be triggered, you get a
> >>>> result,
> >>>>>>>>>> and afterwards the window is deleted again (as it's retention
> time
> >>>>>>>>>> passed already).
> >>>>>>>>>>
> >>>>>>>>>> The retention time is driven by "stream-time", in internal
> tracked
> >>>>>> time
> >>>>>>>>>> that only progressed in forward direction. It gets it value from
> >> the
> >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default
> it
> >>>> will
> >>>>>>>>>> be event-time.
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>>>>>>>>>> I've read this and still have more questions than answers. If
> my
> >>>> data
> >>>>>>>>>> skips
> >>>>>>>>>>> about (timewise) what determines when a given window will
> start /
> >>>>>> stop
> >>>>>>>>>>> accepting new data? What if Im reading data from some time ago?
> >>>>>>>>>>>
> >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> >>>>>>>> matth...@confluent.io>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Please have a look here:
> >>>>>>>>>>>>
> >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>>>>>>>> guide.html#windowing-a-stream
> >>>>>>>>>>>>
> >>>>>>>>>>>> If you have further question, just follow up :)
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and
> >> it's
> >>>>>>>>>> working
> >>>>>>>>>>>>> wonders for keeping the size of the state store in useful
> >>>>>>>> boundaries...
> >>>>>>>>>>>> But
> >>>>>>>>>>>>> Im not 100% clear on how it works.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> What is implied by the '.until()' clause? What determines
> when
> >> to
> >>>>>>>> stop
> >>>>>>>>>>>>> receiving further data - is it clock time (since the window
> was
> >>>>>>>>>> created)?
> >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this may
> >>>>>> bounce
> >>>>>>>>>> all
> >>>>>>>>>>>>> over the place. For non-overlapping windows a given record
> can
> >>>> only
> >>>>>>>>>> fall
> >>>>>>>>>>>>> into a single aggregation period - so when would a value get
> >>>>>>>> discarded?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >>>>>>>>>>>> 1000L).until(10 *
> >>>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to