John Roesler created KAFKA-8582:
-----------------------------------
Summary: Consider adding an ExpiredWindowRecordHandler to Suppress
Key: KAFKA-8582
URL: https://issues.apache.org/jira/browse/KAFKA-8582
Project: Kafka
Issue Type: Improvement
Reporter: John Roesler
I got some feedback on Suppress:
{quote}Specifying how to handle events outside the grace period does seem like
a business concern, and simply discarding them thus seems risky (for example
imagine any situation where money is involved).
This sort of situation is addressed by the late-triggering approach associated
with watermarks
(https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given
this I wondered if you were considering adding anything similar?{quote}
It seems like, if a record has arrived past the grace period for its window,
then the state of the windowed aggregation would already have been lost, so if
we were to compute an aggregation result, it would be incorrect. Plus, since
the window is already expired, we can't store the new (incorrect, but more
importantly expired) aggregation result either, so any subsequent super-late
records would also face the same blank-slate. I think this would wind up
looking like this: if you have three timely records for a window, and then
three more that arrive after the grace period, and you were doing a count
aggregation, you'd see the counts emitted for the window as [1, 2, 3, 1, 1, 1].
I guess we could add a flag to the post-expiration results to indicate that
they're broken, but this seems like the wrong approach. The post-expiration
aggregation _results_ are meaningless, but I could see wanting to send the
past-expiration _input records_ to a dead-letter queue or something instead of
dropping them.
Along this line of thinking, I wonder if we should add an optional
past-expiration record handler interface to the suppression operator. Then, you
could define your own logic, whether it's a dead-letter queue, sending it to
some alerting pipeline, or even just crashing the application before it can do
something wrong. This would be a similar pattern to how we allow custom logic
to handle deserialization errors by supplying a
org.apache.kafka.streams.errors.DeserializationExceptionHandler.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)