lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478743528



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
                     if (endTime < timestamp) {
                         leftWinAgg = next.value;
+                        // store the combined window if it is found so that a 
right window can be created for
+                        // the combined window's max record, as needed
                         if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {

Review comment:
       Yeah I think this makes a lot of sense. The one catch I found is that we 
check to see if `latestLeftTypeWindow != null` when creating the current 
record's left window, and if we update `previousRecord` each time we see a 
window, we could get a number that isn't actually within range for the current 
record's left window. EX: record comes in at 30, timeDifference = 10. we have a 
window [10,20] but it's a right window, and within it the only record is at 12. 
since it's the max record within that window, we would store it in 
`previousRecordTimestamp` , but it's right window is [13,23] which we don't 
want to create. So I think storing just the long is fine (mostly typing it out 
to work out my thought process), and the check 
`rightWindowNecessaryAndPossible` would weed out us creating the [13,23] window 
on accident. Does this all seem right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to