lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478490860
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value,
final long timestamp, fina
final long startTime = next.key.window().start();
final long endTime = startTime +
windows.timeDifferenceMs();
- if (endTime == windows.timeDifferenceMs()) {
+ if (startTime == 0) {
combinedWindow = next;
- } else if (endTime > timestamp && startTime <= timestamp) {
+ } else if (endTime >= timestamp && startTime <= timestamp)
{
rightWinAgg = next.value;
putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
- } else {
+ } else if (startTime == timestamp + 1) {
rightWinAlreadyCreated = true;
}
}
}
+ // if there wasn't a right window agg found and we need a right
window for our new record,
+ // the current aggregate in the combined window iwll go in the new
record's right window
+ if (rightWinAgg == null && combinedWindow != null) {
+ rightWinAgg = combinedWindow.value;
+ }
+
if (combinedWindow == null) {
final TimeWindow window = new TimeWindow(0,
windows.timeDifferenceMs());
final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(initializer.apply(), timestamp);
putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
} else {
- //create the right window for the most recent max timestamp in
the combined window
- final long rightWinStart = combinedWindow.value.timestamp() +
1;
- if (!windowStartTimes.contains(rightWinStart) &&
combinedWindow.value.timestamp() < timestamp) {
- final TimeWindow window = new TimeWindow(rightWinStart,
rightWinStart + windows.timeDifferenceMs());
+ //create the right window for the combined window's max record
before the current record was added
+ final long maxRightWindowStart =
combinedWindow.value.timestamp() + 1;
+ //only create the right window if new record falls within it
and it does not already exist
+ if (!windowStartTimes.contains(maxRightWindowStart) &&
previousRightWindowPossible(maxRightWindowStart, timestamp)) {
Review comment:
The comment here was unclear, that's my bad. It should read `only create
the previous record's right window if the new record falls within it ...` .
This part doesn't leverage `rightWinAgg`, since the agg in the previous
record's right window will just be the current record's value.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]