lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478489438



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + 
windows.timeDifferenceMs();
 
-                    if (endTime == windows.timeDifferenceMs()) {
+                    if (startTime == 0) {
                         combinedWindow = next;
-                    } else if (endTime > timestamp && startTime <= timestamp) {
+                    } else if (endTime >= timestamp && startTime <= timestamp) 
{
                         rightWinAgg = next.value;
                         putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else {
+                    } else if (startTime == timestamp + 1) {
                         rightWinAlreadyCreated = true;
                     }
                 }
             }
 
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window iwll go in the new 
record's right window

Review comment:
       There can be more than one record in the combined window, but if we 
don't find any right window agg then there are no records to the left of this 
record (in time) AND to the right of this record (could be one or the other). 
There could be multiple to the right though.
   
   It took me like 5 minutes to figure this out, but actually the call to 
`previousRightWindowPossible` is when we're creating the right window for the 
previous record, not for the current record. This call doesn't involved 
`rightWindowAgg` at all, since the check `rightWindowStart > 
currentRecordTimestamp` wouldn't help us figure out if `previous record > 
current record`.
   
   The check for this is actually a little further down (line 277) where we do 
`rightWindowIsNotEmpty`, and there it confirms that the values in rightWinAgg 
are > timestamp. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to