lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478489438
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, final long timestamp, fina final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); - if (endTime == windows.timeDifferenceMs()) { + if (startTime == 0) { combinedWindow = next; - } else if (endTime > timestamp && startTime <= timestamp) { + } else if (endTime >= timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); - } else { + } else if (startTime == timestamp + 1) { rightWinAlreadyCreated = true; } } } + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window iwll go in the new record's right window Review comment: There can be more than one record in the combined window, but if we don't find any right window agg then there are no records to the left of this record (in time) AND to the right of this record (could be one or the other). There could be multiple to the right though. It took me like 5 minutes to figure this out, but actually the call to `previousRightWindowPossible` is when we're creating the right window for the previous record, not for the current record. This call doesn't involved `rightWindowAgg` at all, since the check `rightWindowStart > currentRecordTimestamp` wouldn't help us figure out if `previous record > current record`. The check for this is actually a little further down (line 277) where we do `rightWindowIsNotEmpty`, and there it confirms that the values in rightWinAgg are > timestamp. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org