Andre,

Back to your original question I think KIP-138 could help, as it allows you
to punctuate based on processing time as well.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics

Guozhang

On Mon, Jun 19, 2017 at 2:02 PM, Andre Eriksson <an...@tcell.io> wrote:

> So I'm trying to implement a rate limiting processing step using Kafka
> Streams (0.10.2.1).
>
> Basically, this step should just let messages through, unless similar
> messages have already been seen in the last N seconds, in which case it
> should aggregate them into a single message and then send them after N
> seconds have passed.
>
> I initially tried implementing it as a transform step, where process()
> either lets messages through or aggregates/stores them, and where
> punctuate() sends/clears out any stored messages older than N seconds.
> However, because punctuate() is only run when there's new data, messages
> won't be reliably sent when they should be.
>
> I then tried implementing my own scheduling that periodically sends/clears
> out messages using the ProcessorContext provided to the aforementioned
> transform step. However, it seems that when I call forward() from my
> scheduler (i.e. not in a process()/punctuate() call), I get a
> NullPointerException at ProcessorContextImpl.java:81 (
> https://github.com/apache/kafka/blob/0.10.2/streams/src/
> main/java/org/apache/kafka/streams/processor/internals/
> ProcessorContextImpl.java#L81). I assume that this is because
> currentNode() is null outside of process()/punctuate() calls.
>
> I also looked at handling this via a groupBy() and aggregate() operation,
> but it seems this wouldn't meet my requirements (AFAICT there's no way to
> get it to send the first message instantly while also delaying subsequent
> messages).
>
> Is there any other way to do this? Perhaps there's a way to work around
> the NullPointerExceptions?
>
> PS. It seems that https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 138%3A+Change+punctuate+semantics would make my initial approach work,
> but unfortunately it seems to have been pushed to the next release (
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0).




-- 
-- Guozhang

Reply via email to