Hi Sachin

Here is a possible general approach which might work for you in abscence of
any broadcast variable and everything being local state and also if you can
adjust you process to do everything before doing aggregation.

Basically the idea here is to use a custom topology with custom processor
with a persistent state store  and use it to change the data and send it to
a result topic so that the aggregation becomes easier .You can change the
incoming data based on the old data in this state store for a given key or
process it however way you want to and then forward to a final topic.

Below is the link for a sample custom processor.

https://apache.googlesource.com/kafka/+/0.10.0/streams/
examples/src/main/java/org/apache/kafka/streams/examples/wordcount/
WordCountProcessorDemo.java?autodive=0%2F%2F. Note that in this class it
accesses the store on init whose name should be "oldvalues" as per below
pseudo code.

It could be done something like below (if you use the TopologyBuilder api)

Some pseudo code which you might have already seen in the docs

builder.addProcessor("myprocessor",WordCountProcessorDemo::new,
"sourcetopic");

StateStoreSupplier oldValueStore = Stores.create("oldvalues").withKeys(
stringSerde).withValues(stringSerde) .persistent().build();

builder.addStateStore(oldValueStore,"myprocessor");

builder.addSink(toAggregationTopic)

Now create the final KTable out of this toAggregationTopic which has the
cleaned data.


Hope this helps and ignore this if I have misunderstood your usecase

Regards

Sai








On Thu, Nov 10, 2016 at 8:40 PM, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> On the subject of deleting values from list I have following toplogy
> aggregate(old list) = new list -> left join (another list) -> output sink.
>
> While processing the output list I know which values in the list are old
> and can be removed.
> Is there any way to pass that information from downstream back to upstream?
> Any thoughts around how can I pass this information.
>
> One thing I can think off is that I can set some global variable in the
> output sink.
>
> So next time aggregate function is run it can lookup the global variable
> and remove items from the list.
> So new list = old list + new value added - old values removed.
>
> In spark we have something like broadcast variables to do the same.
>
> Is there any such similar concept in kafka streaming.
>
> This way we can keep the changelog topic message from growing and prevent
> the max message bytes exception.
>
> Thanks
> Sachin
>
>
>
>
>
> On Fri, Nov 11, 2016 at 1:02 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA512
> >
> > Sachin,
> >
> > my commend about deleting was about deleting values from the list, not
> > about deleting the whole key/value record.
> >
> > If you want to delete a whole key/value record it there is not update
> > for it for some time, you can combine compaction with retention. You
> > need to alter the configuration of the changelog topics and set
> >
> > cleanup.policy=compact,delete
> >
> > Than, retention.ms will be applied to the changelog, too.
> >
> >
> > - -Matthias
> >
> > On 11/10/16 3:17 AM, Sachin Mittal wrote:
> > > Hi, As per Eno suggestion I have pre-created internal changelog
> > > topics with increased max.message.bytes config to handle big
> > > messages that gets incremented over the time.
> > >
> > > As Matthias has pointed that we cannot use retention.ms setting to
> > > delete older message data after a given time, is there a way to
> > > purge older messages from my changelog topic.
> > >
> > > Remember my changelog topic is key=list of objects and this grows
> > > with time.
> > >
> > > So I would like these to be deleted from time to time because I
> > > would have already consumed the objects so that key/value can be
> > > deleted. Later if I get a new object for the same key then that's a
> > > new message and old data has no use for the streaming application.
> > >
> > > So how can I achieve the following. Would retention.bytes help
> > > here?
> > >
> > > Is there a way if i can set expire after or something like that at
> > > message level and some kafka thread would purge those messages.
> > >
> > > Thanks Sachin
> > >
> > >
> > >
> > > On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax
> > > <matth...@confluent.io> wrote:
> > >
> > > My two cents:
> > >
> > > Changelog topics are compacted topics, thus they do not have a
> > > retention time (there is an exception for windowed KTable changlog
> > > topics that are compacted and do have a retention time though).
> > >
> > > However, I do not understand how changing retention time should
> > > fix the issue. If your list of values grows and exceed
> > > max.message.byte you will need to increase this parameter (or
> > > shrink you value).
> > >
> > > Besides this, Eno's answer is the way to go. In order to figure
> > > out internal topic names, you can use KafkaStreams#toString().
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 11/8/16 11:14 AM, Eno Thereska wrote:
> > >>>> Hi Sachin,
> > >>>>
> > >>>> One option right now would be to precreate all internal
> > >>>> topics in Kafka, and only after that start the Kafka Streams
> > >>>> application. This would require you knowing the internal name
> > >>>> of the topics (in this case you probably already know it, but
> > >>>> I agree that in general this is a bit cumbersome).
> > >>>>
> > >>>> Eno
> > >>>>
> > >>>>> On 8 Nov 2016, at 18:10, Sachin Mittal
> > >>>>> <sjmit...@gmail.com> wrote:
> > >>>>>
> > >>>>> Per message payload size. The basic question is how can I
> > >>>>> control the internal change log topics parameters so as to
> > >>>>> avoid these errors.
> > >>>>>
> > >>>>>
> > >>>>> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna
> > >>>>> <krishna...@gmail.com> wrote:
> > >>>>>
> > >>>>>> Are you talking about total messages and therefore size
> > >>>>>> or per message payload size.
> > >>>>>>
> > >>>>>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal
> > >>>>>> <sjmit...@gmail.com> wrote:
> > >>>>>>
> > >>>>>>> Message size itself increases over the time.
> > >>>>>>>
> > >>>>>>> Message is something like key=[list on objects]
> > >>>>>>>
> > >>>>>>> This increases with time and then at a point kafka is
> > >>>>>>> not able to add any message to its topic because
> > >>>>>>> message size is greater than max.message.bytes. Since
> > >>>>>>> this is an internal topic based off a table I don't
> > >>>>>>> know how can I control this topic.
> > >>>>>>>
> > >>>>>>> If I can set some retention.ms for this topic then I
> > >>>>>>> can purge old messages thereby ensuring that message
> > >>>>>>> size stays within limit.
> > >>>>>>>
> > >>>>>>> Thanks Sachin
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska
> > >>>>>>> <eno.there...@gmail.com> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Sachin,
> > >>>>>>>>
> > >>>>>>>> Could you clarify what you mean by "message size
> > >>>>>>>> increases"? Are
> > >>>>>> messages
> > >>>>>>>> going to the changelog topic increasing in size? Or
> > >>>>>>>> is the changelog
> > >>>>>>> topic
> > >>>>>>>> getting full?
> > >>>>>>>>
> > >>>>>>>> Thanks Eno
> > >>>>>>>>
> > >>>>>>>>> On 8 Nov 2016, at 16:49, Sachin Mittal
> > >>>>>>>>> <sjmit...@gmail.com> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Hi, We are using aggregation by key on a kstream
> > >>>>>>>>> to create a ktable. As I read from
> > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
> > >>>>>>>> Kafka+Streams%3A+Internal+Data+Management
> > >>>>>>>>> it creates an internal changelog topic.
> > >>>>>>>>>
> > >>>>>>>>> However over the time the streaming application is
> > >>>>>>>>> run message size increases and it starts throwing
> > >>>>>>>>> max.message.bytes exception.
> > >>>>>>>>>
> > >>>>>>>>> Is there a way to control the retention.ms time
> > >>>>>>>>> for internal
> > >>>>>> changelog
> > >>>>>>>>> topics so that messages are purged before they
> > >>>>>>>>> exceed this size.
> > >>>>>>>>>
> > >>>>>>>>> If not is there a way to control or avoid such an
> > >>>>>>>>> error.
> > >>>>>>>>>
> > >>>>>>>>> Thanks Sachin
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> -- Radha Krishna, Proddaturi 253-234-5657
> > >>>>>>
> > >>>>
> > >>
> > >
> > -----BEGIN PGP SIGNATURE-----
> > Comment: GPGTools - https://gpgtools.org
> >
> > iQIcBAEBCgAGBQJYJMtVAAoJECnhiMLycopPniYQAIB469Ze65hXNKLzbeSL2YTX
> > YzoedPvvmBj+rTFLVM5r30q2HgGYA97ao8KO6PA1LBMgLqhinT3+flBJv8bZc6FR
> > JdzA8SC7MzucDieG75B/ICy1I7v/qcC6MliM7lv9aEENlrZj8kjCiHejjFsYe/S8
> > 6U7JZofWKU11I9lLDZuq/98yh0UUJRHB9Io4IX4coPNx4DSXkaOIs1imnVcRgZqk
> > BjNPVAceVpjFHomTjI69A7FawYhRGVvQCIh1Cn7N9G1VmVmDx1F1ebBxDWWxQYJB
> > OOQn2tbMz+3HLe60geMmRLVja3UTdQ0rM4XE94CIUNRiWiYO7W32XrMaHJSLPHiD
> > qLRMDKxfZA+1rOeTT+HImbmayaAQWcJsnbT5s36OYT9gdmpCDSVqb7Ad8mAOoi+p
> > NF/O8PGnA3gAWwGyDbxJeYegJ8QUzHlUfwau8tvdSNYdYvFnj3uEQpBgz/Lkgtbz
> > qfod6c6hQx38yahUEBc1JCYtI3NC9F0GH332n+gJZuOZm2CDus7ktJZpxdAZiq+2
> > TAUjrkW3Q1MJmfZFcSDA+T1JuwEnl7xcC6jG0/KjOLlAGndBSSxo9yqozfEtRSuS
> > oFZokBV5hhLGsIaS5FqlbtKL0Z5JaCbT7v/eHNx0i+g5Sya0FelRdARAYAm2UIS2
> > 8U3BrauWX2/zBMi7y//1
> > =imyj
> > -----END PGP SIGNATURE-----
> >
>

Reply via email to