Andre, Back to your original question I think KIP-138 could help, as it allows you to punctuate based on processing time as well.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics Guozhang On Mon, Jun 19, 2017 at 2:02 PM, Andre Eriksson <an...@tcell.io> wrote: > So I'm trying to implement a rate limiting processing step using Kafka > Streams (0.10.2.1). > > Basically, this step should just let messages through, unless similar > messages have already been seen in the last N seconds, in which case it > should aggregate them into a single message and then send them after N > seconds have passed. > > I initially tried implementing it as a transform step, where process() > either lets messages through or aggregates/stores them, and where > punctuate() sends/clears out any stored messages older than N seconds. > However, because punctuate() is only run when there's new data, messages > won't be reliably sent when they should be. > > I then tried implementing my own scheduling that periodically sends/clears > out messages using the ProcessorContext provided to the aforementioned > transform step. However, it seems that when I call forward() from my > scheduler (i.e. not in a process()/punctuate() call), I get a > NullPointerException at ProcessorContextImpl.java:81 ( > https://github.com/apache/kafka/blob/0.10.2/streams/src/ > main/java/org/apache/kafka/streams/processor/internals/ > ProcessorContextImpl.java#L81). I assume that this is because > currentNode() is null outside of process()/punctuate() calls. > > I also looked at handling this via a groupBy() and aggregate() operation, > but it seems this wouldn't meet my requirements (AFAICT there's no way to > get it to send the first message instantly while also delaying subsequent > messages). > > Is there any other way to do this? Perhaps there's a way to work around > the NullPointerExceptions? > > PS. It seems that https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 138%3A+Change+punctuate+semantics would make my initial approach work, > but unfortunately it seems to have been pushed to the next release ( > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0). -- -- Guozhang