-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Sachin,
my commend about deleting was about deleting values from the list, not about deleting the whole key/value record. If you want to delete a whole key/value record it there is not update for it for some time, you can combine compaction with retention. You need to alter the configuration of the changelog topics and set cleanup.policy=compact,delete Than, retention.ms will be applied to the changelog, too. - -Matthias On 11/10/16 3:17 AM, Sachin Mittal wrote: > Hi, As per Eno suggestion I have pre-created internal changelog > topics with increased max.message.bytes config to handle big > messages that gets incremented over the time. > > As Matthias has pointed that we cannot use retention.ms setting to > delete older message data after a given time, is there a way to > purge older messages from my changelog topic. > > Remember my changelog topic is key=list of objects and this grows > with time. > > So I would like these to be deleted from time to time because I > would have already consumed the objects so that key/value can be > deleted. Later if I get a new object for the same key then that's a > new message and old data has no use for the streaming application. > > So how can I achieve the following. Would retention.bytes help > here? > > Is there a way if i can set expire after or something like that at > message level and some kafka thread would purge those messages. > > Thanks Sachin > > > > On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax > <matth...@confluent.io> wrote: > > My two cents: > > Changelog topics are compacted topics, thus they do not have a > retention time (there is an exception for windowed KTable changlog > topics that are compacted and do have a retention time though). > > However, I do not understand how changing retention time should > fix the issue. If your list of values grows and exceed > max.message.byte you will need to increase this parameter (or > shrink you value). > > Besides this, Eno's answer is the way to go. In order to figure > out internal topic names, you can use KafkaStreams#toString(). > > > -Matthias > > > > On 11/8/16 11:14 AM, Eno Thereska wrote: >>>> Hi Sachin, >>>> >>>> One option right now would be to precreate all internal >>>> topics in Kafka, and only after that start the Kafka Streams >>>> application. This would require you knowing the internal name >>>> of the topics (in this case you probably already know it, but >>>> I agree that in general this is a bit cumbersome). >>>> >>>> Eno >>>> >>>>> On 8 Nov 2016, at 18:10, Sachin Mittal >>>>> <sjmit...@gmail.com> wrote: >>>>> >>>>> Per message payload size. The basic question is how can I >>>>> control the internal change log topics parameters so as to >>>>> avoid these errors. >>>>> >>>>> >>>>> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna >>>>> <krishna...@gmail.com> wrote: >>>>> >>>>>> Are you talking about total messages and therefore size >>>>>> or per message payload size. >>>>>> >>>>>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal >>>>>> <sjmit...@gmail.com> wrote: >>>>>> >>>>>>> Message size itself increases over the time. >>>>>>> >>>>>>> Message is something like key=[list on objects] >>>>>>> >>>>>>> This increases with time and then at a point kafka is >>>>>>> not able to add any message to its topic because >>>>>>> message size is greater than max.message.bytes. Since >>>>>>> this is an internal topic based off a table I don't >>>>>>> know how can I control this topic. >>>>>>> >>>>>>> If I can set some retention.ms for this topic then I >>>>>>> can purge old messages thereby ensuring that message >>>>>>> size stays within limit. >>>>>>> >>>>>>> Thanks Sachin >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska >>>>>>> <eno.there...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Sachin, >>>>>>>> >>>>>>>> Could you clarify what you mean by "message size >>>>>>>> increases"? Are >>>>>> messages >>>>>>>> going to the changelog topic increasing in size? Or >>>>>>>> is the changelog >>>>>>> topic >>>>>>>> getting full? >>>>>>>> >>>>>>>> Thanks Eno >>>>>>>> >>>>>>>>> On 8 Nov 2016, at 16:49, Sachin Mittal >>>>>>>>> <sjmit...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Hi, We are using aggregation by key on a kstream >>>>>>>>> to create a ktable. As I read from >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/ >>>>>>>> Kafka+Streams%3A+Internal+Data+Management >>>>>>>>> it creates an internal changelog topic. >>>>>>>>> >>>>>>>>> However over the time the streaming application is >>>>>>>>> run message size increases and it starts throwing >>>>>>>>> max.message.bytes exception. >>>>>>>>> >>>>>>>>> Is there a way to control the retention.ms time >>>>>>>>> for internal >>>>>> changelog >>>>>>>>> topics so that messages are purged before they >>>>>>>>> exceed this size. >>>>>>>>> >>>>>>>>> If not is there a way to control or avoid such an >>>>>>>>> error. >>>>>>>>> >>>>>>>>> Thanks Sachin >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- Radha Krishna, Proddaturi 253-234-5657 >>>>>> >>>> >> > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYJMtVAAoJECnhiMLycopPniYQAIB469Ze65hXNKLzbeSL2YTX YzoedPvvmBj+rTFLVM5r30q2HgGYA97ao8KO6PA1LBMgLqhinT3+flBJv8bZc6FR JdzA8SC7MzucDieG75B/ICy1I7v/qcC6MliM7lv9aEENlrZj8kjCiHejjFsYe/S8 6U7JZofWKU11I9lLDZuq/98yh0UUJRHB9Io4IX4coPNx4DSXkaOIs1imnVcRgZqk BjNPVAceVpjFHomTjI69A7FawYhRGVvQCIh1Cn7N9G1VmVmDx1F1ebBxDWWxQYJB OOQn2tbMz+3HLe60geMmRLVja3UTdQ0rM4XE94CIUNRiWiYO7W32XrMaHJSLPHiD qLRMDKxfZA+1rOeTT+HImbmayaAQWcJsnbT5s36OYT9gdmpCDSVqb7Ad8mAOoi+p NF/O8PGnA3gAWwGyDbxJeYegJ8QUzHlUfwau8tvdSNYdYvFnj3uEQpBgz/Lkgtbz qfod6c6hQx38yahUEBc1JCYtI3NC9F0GH332n+gJZuOZm2CDus7ktJZpxdAZiq+2 TAUjrkW3Q1MJmfZFcSDA+T1JuwEnl7xcC6jG0/KjOLlAGndBSSxo9yqozfEtRSuS oFZokBV5hhLGsIaS5FqlbtKL0Z5JaCbT7v/eHNx0i+g5Sya0FelRdARAYAm2UIS2 8U3BrauWX2/zBMi7y//1 =imyj -----END PGP SIGNATURE-----