[ https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruno Cadonna updated KAFKA-13739: ---------------------------------- Fix Version/s: 3.2.0 > Sliding window without grace not working > ---------------------------------------- > > Key: KAFKA-13739 > URL: https://issues.apache.org/jira/browse/KAFKA-13739 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.1.0 > Reporter: bounkong khamphousone > Assignee: bounkong khamphousone > Priority: Minor > Labels: beginner, newbie > Fix For: 3.2.0 > > > Hi everyone! I would like to understand why KafkaStreams DSL offer the > ability to express a SlidingWindow with no grace period but seems that it > doesn't work. [confluent's > site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows] > state that grace period is required and with the deprecated method, it's > default to 24 hours. > Doing a basic sliding window with a count, if I set grace period to 1 ms, > expected output is done. Based on the sliding window documentation, lower and > upper bounds are inclusive. > If I set grace period to 0 ms, I can see that record is not skipped at > KStreamSlidingWindowAggregate(l.126) but when we try to create the window and > push the event in KStreamSlidingWindowAggregate#createWindows we call the > method updateWindowAndForward(l.417). This method (l.468) check that > {{{}windowEnd > closeTime{}}}. > closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} > (Sliding window configuration) > windowEnd is defined as {{{}inputRecordTimestamp{}}}. > > For a first event with a record timestamp, we can assume that > observedStreamTime is equal to inputRecordTimestamp. > > Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which > results to {{{}inputRecordTimestamp{}}}. > If we go back to the check done in {{updateWindowAndForward}} method, then we > have inputRecordTimestamp > inputRecordTimestamp which is always false. The > record is then skipped for record's own window. > Stating that lower and upper bounds are inclusive, I would have expected the > event to be pushed in the store and forwarded. Hence, the check would be > {{{}windowEnd >= closeTime{}}}. > > Is it a bug or is it intended ? > Thanks in advance for your explanations! > Best regards! -- This message was sent by Atlassian Jira (v8.20.1#820001)