lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478744162



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
                     if (endTime < timestamp) {
                         leftWinAgg = next.value;
+                        // store the combined window if it is found so that a 
right window can be created for
+                        // the combined window's max record, as needed
                         if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
                             latestLeftTypeWindow = next;
                         }
                     } else if (endTime == timestamp) {
                         leftWinAlreadyCreated = true;
+                        // if current record's left window is the combined 
window, need to check later if there is a
+                        // record that needs a right window within the 
combined window
+                        if (endTime == windows.timeDifferenceMs()) {
+                            latestLeftTypeWindow = next;
+                        }

Review comment:
       I think we'd still need a check within this part of the `if()` to catch 
the previous timestamp, right? I think we could take out the `if (endTime == 
windows.TimeDifferenceMS())` and instead always do `previousRecord = 
next.value.timestamp`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to