mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483214805
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createCurrentRecordRightWindow(timestamp, rightWinAgg, key); + } + } + + /** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + // A window from [0, timeDifferenceMs] that holds all early records + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + boolean rightWinAlreadyCreated = false; + final Set<Long> windowStartTimes = new HashSet<>(); + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), Review comment: Given, that we call `processEarly` only if `0 < timestamp < timeDifferenceMs`, we know that `timestamp - 2 * windows.timeDifferenceMs()` would always be negative? Thus, we can just pass in zero here? If this is correct, we might want to add a check at the beginning of this method: ``` if (timestamp < 0 || timestamp >= timeDifferenceMs) { throw new IllegalArgumentException("..."); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org