-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Sachin,

my commend about deleting was about deleting values from the list, not
about deleting the whole key/value record.

If you want to delete a whole key/value record it there is not update
for it for some time, you can combine compaction with retention. You
need to alter the configuration of the changelog topics and set

cleanup.policy=compact,delete

Than, retention.ms will be applied to the changelog, too.


- -Matthias

On 11/10/16 3:17 AM, Sachin Mittal wrote:
> Hi, As per Eno suggestion I have pre-created internal changelog
> topics with increased max.message.bytes config to handle big
> messages that gets incremented over the time.
> 
> As Matthias has pointed that we cannot use retention.ms setting to
> delete older message data after a given time, is there a way to
> purge older messages from my changelog topic.
> 
> Remember my changelog topic is key=list of objects and this grows
> with time.
> 
> So I would like these to be deleted from time to time because I
> would have already consumed the objects so that key/value can be
> deleted. Later if I get a new object for the same key then that's a
> new message and old data has no use for the streaming application.
> 
> So how can I achieve the following. Would retention.bytes help
> here?
> 
> Is there a way if i can set expire after or something like that at
> message level and some kafka thread would purge those messages.
> 
> Thanks Sachin
> 
> 
> 
> On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax
> <matth...@confluent.io> wrote:
> 
> My two cents:
> 
> Changelog topics are compacted topics, thus they do not have a 
> retention time (there is an exception for windowed KTable changlog 
> topics that are compacted and do have a retention time though).
> 
> However, I do not understand how changing retention time should
> fix the issue. If your list of values grows and exceed
> max.message.byte you will need to increase this parameter (or
> shrink you value).
> 
> Besides this, Eno's answer is the way to go. In order to figure
> out internal topic names, you can use KafkaStreams#toString().
> 
> 
> -Matthias
> 
> 
> 
> On 11/8/16 11:14 AM, Eno Thereska wrote:
>>>> Hi Sachin,
>>>> 
>>>> One option right now would be to precreate all internal
>>>> topics in Kafka, and only after that start the Kafka Streams
>>>> application. This would require you knowing the internal name
>>>> of the topics (in this case you probably already know it, but
>>>> I agree that in general this is a bit cumbersome).
>>>> 
>>>> Eno
>>>> 
>>>>> On 8 Nov 2016, at 18:10, Sachin Mittal
>>>>> <sjmit...@gmail.com> wrote:
>>>>> 
>>>>> Per message payload size. The basic question is how can I
>>>>> control the internal change log topics parameters so as to
>>>>> avoid these errors.
>>>>> 
>>>>> 
>>>>> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna
>>>>> <krishna...@gmail.com> wrote:
>>>>> 
>>>>>> Are you talking about total messages and therefore size
>>>>>> or per message payload size.
>>>>>> 
>>>>>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal 
>>>>>> <sjmit...@gmail.com> wrote:
>>>>>> 
>>>>>>> Message size itself increases over the time.
>>>>>>> 
>>>>>>> Message is something like key=[list on objects]
>>>>>>> 
>>>>>>> This increases with time and then at a point kafka is
>>>>>>> not able to add any message to its topic because
>>>>>>> message size is greater than max.message.bytes. Since
>>>>>>> this is an internal topic based off a table I don't
>>>>>>> know how can I control this topic.
>>>>>>> 
>>>>>>> If I can set some retention.ms for this topic then I
>>>>>>> can purge old messages thereby ensuring that message
>>>>>>> size stays within limit.
>>>>>>> 
>>>>>>> Thanks Sachin
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
>>>>>>> <eno.there...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Hi Sachin,
>>>>>>>> 
>>>>>>>> Could you clarify what you mean by "message size 
>>>>>>>> increases"? Are
>>>>>> messages
>>>>>>>> going to the changelog topic increasing in size? Or
>>>>>>>> is the changelog
>>>>>>> topic
>>>>>>>> getting full?
>>>>>>>> 
>>>>>>>> Thanks Eno
>>>>>>>> 
>>>>>>>>> On 8 Nov 2016, at 16:49, Sachin Mittal 
>>>>>>>>> <sjmit...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>> Hi, We are using aggregation by key on a kstream
>>>>>>>>> to create a ktable. As I read from 
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>>>>>>> Kafka+Streams%3A+Internal+Data+Management
>>>>>>>>> it creates an internal changelog topic.
>>>>>>>>> 
>>>>>>>>> However over the time the streaming application is
>>>>>>>>> run message size increases and it starts throwing 
>>>>>>>>> max.message.bytes exception.
>>>>>>>>> 
>>>>>>>>> Is there a way to control the retention.ms time
>>>>>>>>> for internal
>>>>>> changelog
>>>>>>>>> topics so that messages are purged before they
>>>>>>>>> exceed this size.
>>>>>>>>> 
>>>>>>>>> If not is there a way to control or avoid such an
>>>>>>>>> error.
>>>>>>>>> 
>>>>>>>>> Thanks Sachin
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> -- Radha Krishna, Proddaturi 253-234-5657
>>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYJMtVAAoJECnhiMLycopPniYQAIB469Ze65hXNKLzbeSL2YTX
YzoedPvvmBj+rTFLVM5r30q2HgGYA97ao8KO6PA1LBMgLqhinT3+flBJv8bZc6FR
JdzA8SC7MzucDieG75B/ICy1I7v/qcC6MliM7lv9aEENlrZj8kjCiHejjFsYe/S8
6U7JZofWKU11I9lLDZuq/98yh0UUJRHB9Io4IX4coPNx4DSXkaOIs1imnVcRgZqk
BjNPVAceVpjFHomTjI69A7FawYhRGVvQCIh1Cn7N9G1VmVmDx1F1ebBxDWWxQYJB
OOQn2tbMz+3HLe60geMmRLVja3UTdQ0rM4XE94CIUNRiWiYO7W32XrMaHJSLPHiD
qLRMDKxfZA+1rOeTT+HImbmayaAQWcJsnbT5s36OYT9gdmpCDSVqb7Ad8mAOoi+p
NF/O8PGnA3gAWwGyDbxJeYegJ8QUzHlUfwau8tvdSNYdYvFnj3uEQpBgz/Lkgtbz
qfod6c6hQx38yahUEBc1JCYtI3NC9F0GH332n+gJZuOZm2CDus7ktJZpxdAZiq+2
TAUjrkW3Q1MJmfZFcSDA+T1JuwEnl7xcC6jG0/KjOLlAGndBSSxo9yqozfEtRSuS
oFZokBV5hhLGsIaS5FqlbtKL0Z5JaCbT7v/eHNx0i+g5Sya0FelRdARAYAm2UIS2
8U3BrauWX2/zBMi7y//1
=imyj
-----END PGP SIGNATURE-----

Reply via email to