Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread saiprasad mishra
Hi Sachin Here is a possible general approach which might work for you in abscence of any broadcast variable and everything being local state and also if you can adjust you process to do everything before doing aggregation. Basically the idea here is to use a custom topology with custom processor

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 There is no build it support for broadcast variables. Loops are also not supported -- the dataflow graph must be a DAG. You could use an additional topic, plus extra (user instantiated) producer and consumer (using manual topic partition assigment!)

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread Sachin Mittal
Hi, On the subject of deleting values from list I have following toplogy aggregate(old list) = new list -> left join (another list) -> output sink. While processing the output list I know which values in the list are old and can be removed. Is there any way to pass that information from downstream

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 Sachin, my commend about deleting was about deleting values from the list, not about deleting the whole key/value record. If you want to delete a whole key/value record it there is not update for it for some time, you can combine compaction with re

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread Damian Guy
Hi Sachin, You can achieve what you want by setting the correct cleanup.policy on these topics. In this case you want cleanup.policy=compact,delete - you'll also want to set retention.ms and/or retention.bytes. The topic will then be compacted, but it will also delete any segments based on the re

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread Sachin Mittal
Hi, As per Eno suggestion I have pre-created internal changelog topics with increased max.message.bytes config to handle big messages that gets incremented over the time. As Matthias has pointed that we cannot use retention.ms setting to delete older message data after a given time, is there a way

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 My two cents: Changelog topics are compacted topics, thus they do not have a retention time (there is an exception for windowed KTable changlog topics that are compacted and do have a retention time though). However, I do not understand how changin

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Eno Thereska
Hi Sachin, One option right now would be to precreate all internal topics in Kafka, and only after that start the Kafka Streams application. This would require you knowing the internal name of the topics (in this case you probably already know it, but I agree that in general this is a bit cumbe

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Sachin Mittal
Per message payload size. The basic question is how can I control the internal change log topics parameters so as to avoid these errors. On Tue, Nov 8, 2016 at 11:37 PM, R Krishna wrote: > Are you talking about total messages and therefore size or per message > payload size. > > On Tue, Nov 8,

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread R Krishna
Are you talking about total messages and therefore size or per message payload size. On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal wrote: > Message size itself increases over the time. > > Message is something like > key=[list on objects] > > This increases with time and then at a point kafka i

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Sachin Mittal
Message size itself increases over the time. Message is something like key=[list on objects] This increases with time and then at a point kafka is not able to add any message to its topic because message size is greater than max.message.bytes. Since this is an internal topic based off a table I d

Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Eno Thereska
Hi Sachin, Could you clarify what you mean by "message size increases"? Are messages going to the changelog topic increasing in size? Or is the changelog topic getting full? Thanks Eno > On 8 Nov 2016, at 16:49, Sachin Mittal wrote: > > Hi, > We are using aggregation by key on a kstream to

Kafka streaming changelog topic max.message.bytes exception

2016-11-08 Thread Sachin Mittal
Hi, We are using aggregation by key on a kstream to create a ktable. As I read from https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams%3A+Internal+Data+Management it creates an internal changelog topic. However over the time the streaming application is run message size increases and