Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread saiprasad mishra
Hi Sachin

Here is a possible general approach which might work for you in abscence of
any broadcast variable and everything being local state and also if you can
adjust you process to do everything before doing aggregation.

Basically the idea here is to use a custom topology with custom processor
with a persistent state store  and use it to change the data and send it to
a result topic so that the aggregation becomes easier .You can change the
incoming data based on the old data in this state store for a given key or
process it however way you want to and then forward to a final topic.

Below is the link for a sample custom processor.

https://apache.googlesource.com/kafka/+/0.10.0/streams/
examples/src/main/java/org/apache/kafka/streams/examples/wordcount/
WordCountProcessorDemo.java?autodive=0%2F%2F. Note that in this class it
accesses the store on init whose name should be "oldvalues" as per below
pseudo code.

It could be done something like below (if you use the TopologyBuilder api)

Some pseudo code which you might have already seen in the docs

builder.addProcessor("myprocessor",WordCountProcessorDemo::new,
"sourcetopic");

StateStoreSupplier oldValueStore = Stores.create("oldvalues").withKeys(
stringSerde).withValues(stringSerde) .persistent().build();

builder.addStateStore(oldValueStore,"myprocessor");

builder.addSink(toAggregationTopic)

Now create the final KTable out of this toAggregationTopic which has the
cleaned data.


Hope this helps and ignore this if I have misunderstood your usecase

Regards

Sai








On Thu, Nov 10, 2016 at 8:40 PM, Sachin Mittal  wrote:

> Hi,
> On the subject of deleting values from list I have following toplogy
> aggregate(old list) = new list -> left join (another list) -> output sink.
>
> While processing the output list I know which values in the list are old
> and can be removed.
> Is there any way to pass that information from downstream back to upstream?
> Any thoughts around how can I pass this information.
>
> One thing I can think off is that I can set some global variable in the
> output sink.
>
> So next time aggregate function is run it can lookup the global variable
> and remove items from the list.
> So new list = old list + new value added - old values removed.
>
> In spark we have something like broadcast variables to do the same.
>
> Is there any such similar concept in kafka streaming.
>
> This way we can keep the changelog topic message from growing and prevent
> the max message bytes exception.
>
> Thanks
> Sachin
>
>
>
>
>
> On Fri, Nov 11, 2016 at 1:02 AM, Matthias J. Sax 
> wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > Sachin,
> >
> > my commend about deleting was about deleting values from the list, not
> > about deleting the whole key/value record.
> >
> > If you want to delete a whole key/value record it there is not update
> > for it for some time, you can combine compaction with retention. You
> > need to alter the configuration of the changelog topics and set
> >
> > cleanup.policy=compact,delete
> >
> > Than, retention.ms will be applied to the changelog, too.
> >
> >
> > - -Matthias
> >
> > On 11/10/16 3:17 AM, Sachin Mittal wrote:
> > > Hi, As per Eno suggestion I have pre-created internal changelog
> > > topics with increased max.message.bytes config to handle big
> > > messages that gets incremented over the time.
> > >
> > > As Matthias has pointed that we cannot use retention.ms setting to
> > > delete older message data after a given time, is there a way to
> > > purge older messages from my changelog topic.
> > >
> > > Remember my changelog topic is key=list of objects and this grows
> > > with time.
> > >
> > > So I would like these to be deleted from time to time because I
> > > would have already consumed the objects so that key/value can be
> > > deleted. Later if I get a new object for the same key then that's a
> > > new message and old data has no use for the streaming application.
> > >
> > > So how can I achieve the following. Would retention.bytes help
> > > here?
> > >
> > > Is there a way if i can set expire after or something like that at
> > > message level and some kafka thread would purge those messages.
> > >
> > > Thanks Sachin
> > >
> > >
> > >
> > > On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax
> > >  wrote:
> > >
> > > My two cents:
> > >
> > > Changelog topics are compacted topics, thus they do not have a
> > > retention time (there is an exception for windowed KTable changlog
> > > topics that are compacted and do have a retention time though).
> > >
> > > However, I do not understand how changing retention time should
> > > fix the issue. If your list of values grows and exceed
> > > max.message.byte you will need to increase this parameter (or
> > > shrink you value).
> > >
> > > Besides this, Eno's answer is the way to go. In order to figure
> > > out internal topic names, you can use 

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

There is no build it support for broadcast variables. Loops are also
not supported -- the dataflow graph must be a DAG.

You could use an additional topic, plus extra (user instantiated)
producer and consumer (using manual topic partition assigment!) with
you code to build you own feedback/broadcast loop.


- -Matthias


On 11/10/16 8:40 PM, Sachin Mittal wrote:
> Hi, On the subject of deleting values from list I have following
> toplogy aggregate(old list) = new list -> left join (another list)
> -> output sink.
> 
> While processing the output list I know which values in the list
> are old and can be removed. Is there any way to pass that
> information from downstream back to upstream? Any thoughts around
> how can I pass this information.
> 
> One thing I can think off is that I can set some global variable in
> the output sink.
> 
> So next time aggregate function is run it can lookup the global
> variable and remove items from the list. So new list = old list +
> new value added - old values removed.
> 
> In spark we have something like broadcast variables to do the
> same.
> 
> Is there any such similar concept in kafka streaming.
> 
> This way we can keep the changelog topic message from growing and
> prevent the max message bytes exception.
> 
> Thanks Sachin
> 
> 
> 
> 
> 
> On Fri, Nov 11, 2016 at 1:02 AM, Matthias J. Sax
>  wrote:
> 
> Sachin,
> 
> my commend about deleting was about deleting values from the list,
> not about deleting the whole key/value record.
> 
> If you want to delete a whole key/value record it there is not
> update for it for some time, you can combine compaction with
> retention. You need to alter the configuration of the changelog
> topics and set
> 
> cleanup.policy=compact,delete
> 
> Than, retention.ms will be applied to the changelog, too.
> 
> 
> -Matthias
> 
> On 11/10/16 3:17 AM, Sachin Mittal wrote:
 Hi, As per Eno suggestion I have pre-created internal
 changelog topics with increased max.message.bytes config to
 handle big messages that gets incremented over the time.
 
 As Matthias has pointed that we cannot use retention.ms
 setting to delete older message data after a given time, is
 there a way to purge older messages from my changelog topic.
 
 Remember my changelog topic is key=list of objects and this
 grows with time.
 
 So I would like these to be deleted from time to time because
 I would have already consumed the objects so that key/value
 can be deleted. Later if I get a new object for the same key
 then that's a new message and old data has no use for the
 streaming application.
 
 So how can I achieve the following. Would retention.bytes
 help here?
 
 Is there a way if i can set expire after or something like
 that at message level and some kafka thread would purge those
 messages.
 
 Thanks Sachin
 
 
 
 On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax 
  wrote:
 
 My two cents:
 
 Changelog topics are compacted topics, thus they do not have
 a retention time (there is an exception for windowed KTable
 changlog topics that are compacted and do have a retention
 time though).
 
 However, I do not understand how changing retention time
 should fix the issue. If your list of values grows and
 exceed max.message.byte you will need to increase this
 parameter (or shrink you value).
 
 Besides this, Eno's answer is the way to go. In order to
 figure out internal topic names, you can use
 KafkaStreams#toString().
 
 
 -Matthias
 
 
 
 On 11/8/16 11:14 AM, Eno Thereska wrote:
>>> Hi Sachin,
>>> 
>>> One option right now would be to precreate all
>>> internal topics in Kafka, and only after that start the
>>> Kafka Streams application. This would require you
>>> knowing the internal name of the topics (in this case
>>> you probably already know it, but I agree that in
>>> general this is a bit cumbersome).
>>> 
>>> Eno
>>> 
 On 8 Nov 2016, at 18:10, Sachin Mittal 
  wrote:
 
 Per message payload size. The basic question is how
 can I control the internal change log topics
 parameters so as to avoid these errors.
 
 
 On Tue, Nov 8, 2016 at 11:37 PM, R Krishna 
  wrote:
 
> Are you talking about total messages and therefore
> size or per message payload size.
> 
> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal 
>  wrote:
> 
>> Message size itself increases over the time.
>> 
>> Message is something like key=[list on objects]
>> 
>> This increases with time and then at a point

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread Sachin Mittal
Hi,
On the subject of deleting values from list I have following toplogy
aggregate(old list) = new list -> left join (another list) -> output sink.

While processing the output list I know which values in the list are old
and can be removed.
Is there any way to pass that information from downstream back to upstream?
Any thoughts around how can I pass this information.

One thing I can think off is that I can set some global variable in the
output sink.

So next time aggregate function is run it can lookup the global variable
and remove items from the list.
So new list = old list + new value added - old values removed.

In spark we have something like broadcast variables to do the same.

Is there any such similar concept in kafka streaming.

This way we can keep the changelog topic message from growing and prevent
the max message bytes exception.

Thanks
Sachin





On Fri, Nov 11, 2016 at 1:02 AM, Matthias J. Sax 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Sachin,
>
> my commend about deleting was about deleting values from the list, not
> about deleting the whole key/value record.
>
> If you want to delete a whole key/value record it there is not update
> for it for some time, you can combine compaction with retention. You
> need to alter the configuration of the changelog topics and set
>
> cleanup.policy=compact,delete
>
> Than, retention.ms will be applied to the changelog, too.
>
>
> - -Matthias
>
> On 11/10/16 3:17 AM, Sachin Mittal wrote:
> > Hi, As per Eno suggestion I have pre-created internal changelog
> > topics with increased max.message.bytes config to handle big
> > messages that gets incremented over the time.
> >
> > As Matthias has pointed that we cannot use retention.ms setting to
> > delete older message data after a given time, is there a way to
> > purge older messages from my changelog topic.
> >
> > Remember my changelog topic is key=list of objects and this grows
> > with time.
> >
> > So I would like these to be deleted from time to time because I
> > would have already consumed the objects so that key/value can be
> > deleted. Later if I get a new object for the same key then that's a
> > new message and old data has no use for the streaming application.
> >
> > So how can I achieve the following. Would retention.bytes help
> > here?
> >
> > Is there a way if i can set expire after or something like that at
> > message level and some kafka thread would purge those messages.
> >
> > Thanks Sachin
> >
> >
> >
> > On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax
> >  wrote:
> >
> > My two cents:
> >
> > Changelog topics are compacted topics, thus they do not have a
> > retention time (there is an exception for windowed KTable changlog
> > topics that are compacted and do have a retention time though).
> >
> > However, I do not understand how changing retention time should
> > fix the issue. If your list of values grows and exceed
> > max.message.byte you will need to increase this parameter (or
> > shrink you value).
> >
> > Besides this, Eno's answer is the way to go. In order to figure
> > out internal topic names, you can use KafkaStreams#toString().
> >
> >
> > -Matthias
> >
> >
> >
> > On 11/8/16 11:14 AM, Eno Thereska wrote:
>  Hi Sachin,
> 
>  One option right now would be to precreate all internal
>  topics in Kafka, and only after that start the Kafka Streams
>  application. This would require you knowing the internal name
>  of the topics (in this case you probably already know it, but
>  I agree that in general this is a bit cumbersome).
> 
>  Eno
> 
> > On 8 Nov 2016, at 18:10, Sachin Mittal
> >  wrote:
> >
> > Per message payload size. The basic question is how can I
> > control the internal change log topics parameters so as to
> > avoid these errors.
> >
> >
> > On Tue, Nov 8, 2016 at 11:37 PM, R Krishna
> >  wrote:
> >
> >> Are you talking about total messages and therefore size
> >> or per message payload size.
> >>
> >> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal
> >>  wrote:
> >>
> >>> Message size itself increases over the time.
> >>>
> >>> Message is something like key=[list on objects]
> >>>
> >>> This increases with time and then at a point kafka is
> >>> not able to add any message to its topic because
> >>> message size is greater than max.message.bytes. Since
> >>> this is an internal topic based off a table I don't
> >>> know how can I control this topic.
> >>>
> >>> If I can set some retention.ms for this topic then I
> >>> can purge old messages thereby ensuring that message
> >>> size stays within limit.
> >>>
> >>> Thanks Sachin
> >>>
> >>>
> >>>
> >>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska
> >>>  wrote:
> 

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Sachin,

my commend about deleting was about deleting values from the list, not
about deleting the whole key/value record.

If you want to delete a whole key/value record it there is not update
for it for some time, you can combine compaction with retention. You
need to alter the configuration of the changelog topics and set

cleanup.policy=compact,delete

Than, retention.ms will be applied to the changelog, too.


- -Matthias

On 11/10/16 3:17 AM, Sachin Mittal wrote:
> Hi, As per Eno suggestion I have pre-created internal changelog
> topics with increased max.message.bytes config to handle big
> messages that gets incremented over the time.
> 
> As Matthias has pointed that we cannot use retention.ms setting to
> delete older message data after a given time, is there a way to
> purge older messages from my changelog topic.
> 
> Remember my changelog topic is key=list of objects and this grows
> with time.
> 
> So I would like these to be deleted from time to time because I
> would have already consumed the objects so that key/value can be
> deleted. Later if I get a new object for the same key then that's a
> new message and old data has no use for the streaming application.
> 
> So how can I achieve the following. Would retention.bytes help
> here?
> 
> Is there a way if i can set expire after or something like that at
> message level and some kafka thread would purge those messages.
> 
> Thanks Sachin
> 
> 
> 
> On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax
>  wrote:
> 
> My two cents:
> 
> Changelog topics are compacted topics, thus they do not have a 
> retention time (there is an exception for windowed KTable changlog 
> topics that are compacted and do have a retention time though).
> 
> However, I do not understand how changing retention time should
> fix the issue. If your list of values grows and exceed
> max.message.byte you will need to increase this parameter (or
> shrink you value).
> 
> Besides this, Eno's answer is the way to go. In order to figure
> out internal topic names, you can use KafkaStreams#toString().
> 
> 
> -Matthias
> 
> 
> 
> On 11/8/16 11:14 AM, Eno Thereska wrote:
 Hi Sachin,
 
 One option right now would be to precreate all internal
 topics in Kafka, and only after that start the Kafka Streams
 application. This would require you knowing the internal name
 of the topics (in this case you probably already know it, but
 I agree that in general this is a bit cumbersome).
 
 Eno
 
> On 8 Nov 2016, at 18:10, Sachin Mittal
>  wrote:
> 
> Per message payload size. The basic question is how can I
> control the internal change log topics parameters so as to
> avoid these errors.
> 
> 
> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna
>  wrote:
> 
>> Are you talking about total messages and therefore size
>> or per message payload size.
>> 
>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal 
>>  wrote:
>> 
>>> Message size itself increases over the time.
>>> 
>>> Message is something like key=[list on objects]
>>> 
>>> This increases with time and then at a point kafka is
>>> not able to add any message to its topic because
>>> message size is greater than max.message.bytes. Since
>>> this is an internal topic based off a table I don't
>>> know how can I control this topic.
>>> 
>>> If I can set some retention.ms for this topic then I
>>> can purge old messages thereby ensuring that message
>>> size stays within limit.
>>> 
>>> Thanks Sachin
>>> 
>>> 
>>> 
>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
>>>  wrote:
>>> 
 Hi Sachin,
 
 Could you clarify what you mean by "message size 
 increases"? Are
>> messages
 going to the changelog topic increasing in size? Or
 is the changelog
>>> topic
 getting full?
 
 Thanks Eno
 
> On 8 Nov 2016, at 16:49, Sachin Mittal 
>  wrote:
> 
> Hi, We are using aggregation by key on a kstream
> to create a ktable. As I read from 
> https://cwiki.apache.org/confluence/display/KAFKA/
 Kafka+Streams%3A+Internal+Data+Management
> it creates an internal changelog topic.
> 
> However over the time the streaming application is
> run message size increases and it starts throwing 
> max.message.bytes exception.
> 
> Is there a way to control the retention.ms time
> for internal
>> changelog
> topics so that messages are purged before they
> exceed this size.
> 
> If not is there a way to control or avoid such an
> error.
> 
> 

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread Damian Guy
Hi Sachin,

You can achieve what you want by setting the correct cleanup.policy on
these topics.
In this case you want cleanup.policy=compact,delete - you'll also want to
set retention.ms and/or retention.bytes.

The topic will then be compacted, but it will also delete any segments
based on the retention settings.

You can look here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist
for
further details.

HTH,
Damian

On Thu, 10 Nov 2016 at 11:17 Sachin Mittal  wrote:

> Hi,
> As per Eno suggestion I have pre-created internal changelog topics with
> increased max.message.bytes config to handle big messages that gets
> incremented over the time.
>
> As Matthias has pointed that we cannot use retention.ms setting to delete
> older message data after a given time, is there a way to purge older
> messages from my changelog topic.
>
> Remember my changelog topic is
> key=list of objects
> and this grows with time.
>
> So I would like these to be deleted from time to time because I would have
> already consumed the objects so that key/value can be deleted.
> Later if I get a new object for the same key then that's a new message and
> old data has no use for the streaming application.
>
> So how can I achieve the following.
> Would retention.bytes help here?
>
> Is there a way if i can set expire after or something like that at message
> level and some kafka thread would purge those messages.
>
> Thanks
> Sachin
>
>
>
> On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax 
> wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > My two cents:
> >
> > Changelog topics are compacted topics, thus they do not have a
> > retention time (there is an exception for windowed KTable changlog
> > topics that are compacted and do have a retention time though).
> >
> > However, I do not understand how changing retention time should fix
> > the issue. If your list of values grows and exceed max.message.byte
> > you will need to increase this parameter (or shrink you value).
> >
> > Besides this, Eno's answer is the way to go. In order to figure out
> > internal topic names, you can use KafkaStreams#toString().
> >
> >
> > - -Matthias
> >
> >
> >
> > On 11/8/16 11:14 AM, Eno Thereska wrote:
> > > Hi Sachin,
> > >
> > > One option right now would be to precreate all internal topics in
> > > Kafka, and only after that start the Kafka Streams application.
> > > This would require you knowing the internal name of the topics (in
> > > this case you probably already know it, but I agree that in general
> > > this is a bit cumbersome).
> > >
> > > Eno
> > >
> > >> On 8 Nov 2016, at 18:10, Sachin Mittal 
> > >> wrote:
> > >>
> > >> Per message payload size. The basic question is how can I control
> > >> the internal change log topics parameters so as to avoid these
> > >> errors.
> > >>
> > >>
> > >> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna 
> > >> wrote:
> > >>
> > >>> Are you talking about total messages and therefore size or per
> > >>> message payload size.
> > >>>
> > >>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal
> > >>>  wrote:
> > >>>
> >  Message size itself increases over the time.
> > 
> >  Message is something like key=[list on objects]
> > 
> >  This increases with time and then at a point kafka is not
> >  able to add any message to its topic because message size is
> >  greater than max.message.bytes. Since this is an internal
> >  topic based off a table I don't know how can I control this
> >  topic.
> > 
> >  If I can set some retention.ms for this topic then I can
> >  purge old messages thereby ensuring that message size stays
> >  within limit.
> > 
> >  Thanks Sachin
> > 
> > 
> > 
> >  On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska
> >   wrote:
> > 
> > > Hi Sachin,
> > >
> > > Could you clarify what you mean by "message size
> > > increases"? Are
> > >>> messages
> > > going to the changelog topic increasing in size? Or is the
> > > changelog
> >  topic
> > > getting full?
> > >
> > > Thanks Eno
> > >
> > >> On 8 Nov 2016, at 16:49, Sachin Mittal
> > >>  wrote:
> > >>
> > >> Hi, We are using aggregation by key on a kstream to
> > >> create a ktable. As I read from
> > >> https://cwiki.apache.org/confluence/display/KAFKA/
> > > Kafka+Streams%3A+Internal+Data+Management
> > >> it creates an internal changelog topic.
> > >>
> > >> However over the time the streaming application is run
> > >> message size increases and it starts throwing
> > >> max.message.bytes exception.
> > >>
> > >> Is there a way to control the retention.ms time for
> > >> internal
> > >>> changelog
> > >> topics so that messages are 

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread Sachin Mittal
Hi,
As per Eno suggestion I have pre-created internal changelog topics with
increased max.message.bytes config to handle big messages that gets
incremented over the time.

As Matthias has pointed that we cannot use retention.ms setting to delete
older message data after a given time, is there a way to purge older
messages from my changelog topic.

Remember my changelog topic is
key=list of objects
and this grows with time.

So I would like these to be deleted from time to time because I would have
already consumed the objects so that key/value can be deleted.
Later if I get a new object for the same key then that's a new message and
old data has no use for the streaming application.

So how can I achieve the following.
Would retention.bytes help here?

Is there a way if i can set expire after or something like that at message
level and some kafka thread would purge those messages.

Thanks
Sachin



On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> My two cents:
>
> Changelog topics are compacted topics, thus they do not have a
> retention time (there is an exception for windowed KTable changlog
> topics that are compacted and do have a retention time though).
>
> However, I do not understand how changing retention time should fix
> the issue. If your list of values grows and exceed max.message.byte
> you will need to increase this parameter (or shrink you value).
>
> Besides this, Eno's answer is the way to go. In order to figure out
> internal topic names, you can use KafkaStreams#toString().
>
>
> - -Matthias
>
>
>
> On 11/8/16 11:14 AM, Eno Thereska wrote:
> > Hi Sachin,
> >
> > One option right now would be to precreate all internal topics in
> > Kafka, and only after that start the Kafka Streams application.
> > This would require you knowing the internal name of the topics (in
> > this case you probably already know it, but I agree that in general
> > this is a bit cumbersome).
> >
> > Eno
> >
> >> On 8 Nov 2016, at 18:10, Sachin Mittal 
> >> wrote:
> >>
> >> Per message payload size. The basic question is how can I control
> >> the internal change log topics parameters so as to avoid these
> >> errors.
> >>
> >>
> >> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna 
> >> wrote:
> >>
> >>> Are you talking about total messages and therefore size or per
> >>> message payload size.
> >>>
> >>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal
> >>>  wrote:
> >>>
>  Message size itself increases over the time.
> 
>  Message is something like key=[list on objects]
> 
>  This increases with time and then at a point kafka is not
>  able to add any message to its topic because message size is
>  greater than max.message.bytes. Since this is an internal
>  topic based off a table I don't know how can I control this
>  topic.
> 
>  If I can set some retention.ms for this topic then I can
>  purge old messages thereby ensuring that message size stays
>  within limit.
> 
>  Thanks Sachin
> 
> 
> 
>  On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska
>   wrote:
> 
> > Hi Sachin,
> >
> > Could you clarify what you mean by "message size
> > increases"? Are
> >>> messages
> > going to the changelog topic increasing in size? Or is the
> > changelog
>  topic
> > getting full?
> >
> > Thanks Eno
> >
> >> On 8 Nov 2016, at 16:49, Sachin Mittal
> >>  wrote:
> >>
> >> Hi, We are using aggregation by key on a kstream to
> >> create a ktable. As I read from
> >> https://cwiki.apache.org/confluence/display/KAFKA/
> > Kafka+Streams%3A+Internal+Data+Management
> >> it creates an internal changelog topic.
> >>
> >> However over the time the streaming application is run
> >> message size increases and it starts throwing
> >> max.message.bytes exception.
> >>
> >> Is there a way to control the retention.ms time for
> >> internal
> >>> changelog
> >> topics so that messages are purged before they exceed
> >> this size.
> >>
> >> If not is there a way to control or avoid such an error.
> >>
> >> Thanks Sachin
> >
> >
> 
> >>>
> >>>
> >>>
> >>> -- Radha Krishna, Proddaturi 253-234-5657
> >>>
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYIjGcAAoJECnhiMLycopPMp4P/3+mEVc8bIunni9nuNUFBWk0
> S/UvCvgkb7JBqBdVl7IpDsylAB+TwdMOTf+oE13buxF+XScTV04U+DYl1T/4DE/U
> PObXQsKFutY59u6k9AIW7H+aTJPRa+3M8SHf3zEdLsukzFw+F1gJcPbFxkr871Ck
> pw2A3PuSXHe2K2u1t/SI/IuhSSk2K54gxVCbnK/XQqnpp1/JZNHP+ar6jplCM7ix
> 8EOkgLgw/Kh4i0c7yuPbGOZ1wiPtimuWJI/FtKf+i2UiT7LUAzkbNdbXzBFGDoG7
> xpSgqOhC5pBUqymHQxmSTCJvO3bAlGRg0rWmPfRjmFdcQlR7a/I6po9eVAjWpaMk
> IFlKvplRgY4ubbkbRUWGBVIv5dwl4IT6SJ5FubPZkw1A4147H0SJB09CvdwXY43+
> 

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

My two cents:

Changelog topics are compacted topics, thus they do not have a
retention time (there is an exception for windowed KTable changlog
topics that are compacted and do have a retention time though).

However, I do not understand how changing retention time should fix
the issue. If your list of values grows and exceed max.message.byte
you will need to increase this parameter (or shrink you value).

Besides this, Eno's answer is the way to go. In order to figure out
internal topic names, you can use KafkaStreams#toString().


- -Matthias



On 11/8/16 11:14 AM, Eno Thereska wrote:
> Hi Sachin,
> 
> One option right now would be to precreate all internal topics in
> Kafka, and only after that start the Kafka Streams application.
> This would require you knowing the internal name of the topics (in
> this case you probably already know it, but I agree that in general
> this is a bit cumbersome).
> 
> Eno
> 
>> On 8 Nov 2016, at 18:10, Sachin Mittal 
>> wrote:
>> 
>> Per message payload size. The basic question is how can I control
>> the internal change log topics parameters so as to avoid these
>> errors.
>> 
>> 
>> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna 
>> wrote:
>> 
>>> Are you talking about total messages and therefore size or per
>>> message payload size.
>>> 
>>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal
>>>  wrote:
>>> 
 Message size itself increases over the time.
 
 Message is something like key=[list on objects]
 
 This increases with time and then at a point kafka is not
 able to add any message to its topic because message size is
 greater than max.message.bytes. Since this is an internal
 topic based off a table I don't know how can I control this
 topic.
 
 If I can set some retention.ms for this topic then I can
 purge old messages thereby ensuring that message size stays
 within limit.
 
 Thanks Sachin
 
 
 
 On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska
  wrote:
 
> Hi Sachin,
> 
> Could you clarify what you mean by "message size
> increases"? Are
>>> messages
> going to the changelog topic increasing in size? Or is the
> changelog
 topic
> getting full?
> 
> Thanks Eno
> 
>> On 8 Nov 2016, at 16:49, Sachin Mittal
>>  wrote:
>> 
>> Hi, We are using aggregation by key on a kstream to
>> create a ktable. As I read from 
>> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams%3A+Internal+Data+Management
>> it creates an internal changelog topic.
>> 
>> However over the time the streaming application is run
>> message size increases and it starts throwing
>> max.message.bytes exception.
>> 
>> Is there a way to control the retention.ms time for
>> internal
>>> changelog
>> topics so that messages are purged before they exceed
>> this size.
>> 
>> If not is there a way to control or avoid such an error.
>> 
>> Thanks Sachin
> 
> 
 
>>> 
>>> 
>>> 
>>> -- Radha Krishna, Proddaturi 253-234-5657
>>> 
> 
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYIjGcAAoJECnhiMLycopPMp4P/3+mEVc8bIunni9nuNUFBWk0
S/UvCvgkb7JBqBdVl7IpDsylAB+TwdMOTf+oE13buxF+XScTV04U+DYl1T/4DE/U
PObXQsKFutY59u6k9AIW7H+aTJPRa+3M8SHf3zEdLsukzFw+F1gJcPbFxkr871Ck
pw2A3PuSXHe2K2u1t/SI/IuhSSk2K54gxVCbnK/XQqnpp1/JZNHP+ar6jplCM7ix
8EOkgLgw/Kh4i0c7yuPbGOZ1wiPtimuWJI/FtKf+i2UiT7LUAzkbNdbXzBFGDoG7
xpSgqOhC5pBUqymHQxmSTCJvO3bAlGRg0rWmPfRjmFdcQlR7a/I6po9eVAjWpaMk
IFlKvplRgY4ubbkbRUWGBVIv5dwl4IT6SJ5FubPZkw1A4147H0SJB09CvdwXY43+
5HjW76lHmYRUtdFl+RTlTxNUy/yfjnIXzLjQqHEnzcIPdnJY2lM6iUj94JPzFMUE
nY6z68PoXdKZw2VkkkiB7bnyaH1wRFD+AZKQH8ZoH2axYExg+MxJk+Fhcd+E2yU/
TL8b6lEcvwHOUU13H0ztSBUIJsjdh8aLVpSTvVtClDGKJJpueNznsbxf4TiVGoOm
INFNIJFfnZ2c9rOH8AGJHkdIjkJaAB8DbxP4pYoNTPboCjeFFe/B3dBUlxLkWiDq
Ny16O/mM8+6ydEG8ZzcA
=+92S
-END PGP SIGNATURE-


Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Eno Thereska
Hi Sachin,

One option right now would be to precreate all internal topics in Kafka, and 
only after that start the Kafka Streams application. This would require you 
knowing the internal name of the topics (in this case you probably already know 
it, but I agree that in general this is a bit cumbersome).

Eno

> On 8 Nov 2016, at 18:10, Sachin Mittal  wrote:
> 
> Per message payload size. The basic question is how can I control the
> internal change log topics parameters so as to avoid these errors.
> 
> 
> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna  wrote:
> 
>> Are you talking about total messages and therefore size or per message
>> payload size.
>> 
>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal  wrote:
>> 
>>> Message size itself increases over the time.
>>> 
>>> Message is something like
>>> key=[list on objects]
>>> 
>>> This increases with time and then at a point kafka is not able to add any
>>> message to its topic because message size is greater than
>>> max.message.bytes.
>>> Since this is an internal topic based off a table I don't know how can I
>>> control this topic.
>>> 
>>> If I can set some retention.ms for this topic then I can purge old
>>> messages
>>> thereby ensuring that message size stays within limit.
>>> 
>>> Thanks
>>> Sachin
>>> 
>>> 
>>> 
>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
>>> wrote:
>>> 
 Hi Sachin,
 
 Could you clarify what you mean by "message size increases"? Are
>> messages
 going to the changelog topic increasing in size? Or is the changelog
>>> topic
 getting full?
 
 Thanks
 Eno
 
> On 8 Nov 2016, at 16:49, Sachin Mittal  wrote:
> 
> Hi,
> We are using aggregation by key on a kstream to create a ktable.
> As I read from
> https://cwiki.apache.org/confluence/display/KAFKA/
 Kafka+Streams%3A+Internal+Data+Management
> it creates an internal changelog topic.
> 
> However over the time the streaming application is run message size
> increases and it starts throwing max.message.bytes exception.
> 
> Is there a way to control the retention.ms time for internal
>> changelog
> topics so that messages are purged before they exceed this size.
> 
> If not is there a way to control or avoid such an error.
> 
> Thanks
> Sachin
 
 
>>> 
>> 
>> 
>> 
>> --
>> Radha Krishna, Proddaturi
>> 253-234-5657
>> 



Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Sachin Mittal
Per message payload size. The basic question is how can I control the
internal change log topics parameters so as to avoid these errors.


On Tue, Nov 8, 2016 at 11:37 PM, R Krishna  wrote:

> Are you talking about total messages and therefore size or per message
> payload size.
>
> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal  wrote:
>
> > Message size itself increases over the time.
> >
> > Message is something like
> > key=[list on objects]
> >
> > This increases with time and then at a point kafka is not able to add any
> > message to its topic because message size is greater than
> > max.message.bytes.
> > Since this is an internal topic based off a table I don't know how can I
> > control this topic.
> >
> > If I can set some retention.ms for this topic then I can purge old
> > messages
> > thereby ensuring that message size stays within limit.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
> > wrote:
> >
> > > Hi Sachin,
> > >
> > > Could you clarify what you mean by "message size increases"? Are
> messages
> > > going to the changelog topic increasing in size? Or is the changelog
> > topic
> > > getting full?
> > >
> > > Thanks
> > > Eno
> > >
> > > > On 8 Nov 2016, at 16:49, Sachin Mittal  wrote:
> > > >
> > > > Hi,
> > > > We are using aggregation by key on a kstream to create a ktable.
> > > > As I read from
> > > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > Kafka+Streams%3A+Internal+Data+Management
> > > > it creates an internal changelog topic.
> > > >
> > > > However over the time the streaming application is run message size
> > > > increases and it starts throwing max.message.bytes exception.
> > > >
> > > > Is there a way to control the retention.ms time for internal
> changelog
> > > > topics so that messages are purged before they exceed this size.
> > > >
> > > > If not is there a way to control or avoid such an error.
> > > >
> > > > Thanks
> > > > Sachin
> > >
> > >
> >
>
>
>
> --
> Radha Krishna, Proddaturi
> 253-234-5657
>


Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread R Krishna
Are you talking about total messages and therefore size or per message
payload size.

On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal  wrote:

> Message size itself increases over the time.
>
> Message is something like
> key=[list on objects]
>
> This increases with time and then at a point kafka is not able to add any
> message to its topic because message size is greater than
> max.message.bytes.
> Since this is an internal topic based off a table I don't know how can I
> control this topic.
>
> If I can set some retention.ms for this topic then I can purge old
> messages
> thereby ensuring that message size stays within limit.
>
> Thanks
> Sachin
>
>
>
> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
> wrote:
>
> > Hi Sachin,
> >
> > Could you clarify what you mean by "message size increases"? Are messages
> > going to the changelog topic increasing in size? Or is the changelog
> topic
> > getting full?
> >
> > Thanks
> > Eno
> >
> > > On 8 Nov 2016, at 16:49, Sachin Mittal  wrote:
> > >
> > > Hi,
> > > We are using aggregation by key on a kstream to create a ktable.
> > > As I read from
> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > Kafka+Streams%3A+Internal+Data+Management
> > > it creates an internal changelog topic.
> > >
> > > However over the time the streaming application is run message size
> > > increases and it starts throwing max.message.bytes exception.
> > >
> > > Is there a way to control the retention.ms time for internal changelog
> > > topics so that messages are purged before they exceed this size.
> > >
> > > If not is there a way to control or avoid such an error.
> > >
> > > Thanks
> > > Sachin
> >
> >
>



-- 
Radha Krishna, Proddaturi
253-234-5657


Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Sachin Mittal
Message size itself increases over the time.

Message is something like
key=[list on objects]

This increases with time and then at a point kafka is not able to add any
message to its topic because message size is greater than max.message.bytes.
Since this is an internal topic based off a table I don't know how can I
control this topic.

If I can set some retention.ms for this topic then I can purge old messages
thereby ensuring that message size stays within limit.

Thanks
Sachin



On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
wrote:

> Hi Sachin,
>
> Could you clarify what you mean by "message size increases"? Are messages
> going to the changelog topic increasing in size? Or is the changelog topic
> getting full?
>
> Thanks
> Eno
>
> > On 8 Nov 2016, at 16:49, Sachin Mittal  wrote:
> >
> > Hi,
> > We are using aggregation by key on a kstream to create a ktable.
> > As I read from
> > https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams%3A+Internal+Data+Management
> > it creates an internal changelog topic.
> >
> > However over the time the streaming application is run message size
> > increases and it starts throwing max.message.bytes exception.
> >
> > Is there a way to control the retention.ms time for internal changelog
> > topics so that messages are purged before they exceed this size.
> >
> > If not is there a way to control or avoid such an error.
> >
> > Thanks
> > Sachin
>
>


Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Eno Thereska
Hi Sachin,

Could you clarify what you mean by "message size increases"? Are messages going 
to the changelog topic increasing in size? Or is the changelog topic getting 
full? 

Thanks
Eno

> On 8 Nov 2016, at 16:49, Sachin Mittal  wrote:
> 
> Hi,
> We are using aggregation by key on a kstream to create a ktable.
> As I read from
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams%3A+Internal+Data+Management
> it creates an internal changelog topic.
> 
> However over the time the streaming application is run message size
> increases and it starts throwing max.message.bytes exception.
> 
> Is there a way to control the retention.ms time for internal changelog
> topics so that messages are purged before they exceed this size.
> 
> If not is there a way to control or avoid such an error.
> 
> Thanks
> Sachin



Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Sachin Mittal
Hi,
We are using aggregation by key on a kstream to create a ktable.
As I read from
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams%3A+Internal+Data+Management
it creates an internal changelog topic.

However over the time the streaming application is run message size
increases and it starts throwing max.message.bytes exception.

Is there a way to control the retention.ms time for internal changelog
topics so that messages are purged before they exceed this size.

If not is there a way to control or avoid such an error.

Thanks
Sachin