ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478730258
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value,
final long timestamp) {
if (endTime < timestamp) {
leftWinAgg = next.value;
+ // store the combined window if it is found so that a
right window can be created for
+ // the combined window's max record, as needed
if (isLeftWindow(next) || endTime ==
windows.timeDifferenceMs()) {
latestLeftTypeWindow = next;
}
} else if (endTime == timestamp) {
leftWinAlreadyCreated = true;
+ // if current record's left window is the combined
window, need to check later if there is a
+ // record that needs a right window within the
combined window
+ if (endTime == windows.timeDifferenceMs()) {
+ latestLeftTypeWindow = next;
+ }
Review comment:
> That being said, I get that this is confusing. Do you think changing
the check to `if (endTime == windows.TimeDifferenceMs() &&
!isLeftWindow(next))` would make it seem cleaner?
Haha no, I don't think saying `if (!isLeftWindow(next)): then next =
latestLeftTypeWindow` would be less confusing. If we call a variable
`leftTypeWindow` then it should _always_ be a left type window.
That said, I now see what you meant here and it's the same problem as above,
with the same fix of replacing `latestLeftTypeWindow` with
`previousRecordTimestamp`. In that case I think we can just remove this check
entirely (ie, don't explicitly check if it's the combined window), and all we
need to do is make sure `previousRecordTimestamp` is set correctly
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]