ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485221052



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             boolean leftWinAlreadyCreated = false;
             boolean rightWinAlreadyCreated = false;
 
-            // keep the left type window closest to the record
-            Window latestLeftTypeWindow = null;
+            Long previousRecordTimestamp = null;
+
             try (
                 final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
                     key,
                     key,
-                    timestamp - 2 * windows.timeDifferenceMs(),
+                    Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
                     // to catch the current record's right window, if it 
exists, without more calls to the store
-                    timestamp + 1)
+                    inputRecordTimestamp + 1)
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
-                    windowStartTimes.add(next.key.window().start());
-                    final long startTime = next.key.window().start();
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> 
windowBeingProcessed = iterator.next();
+                    final long startTime = 
windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
                     final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
 
-                    if (endTime < timestamp) {
-                        leftWinAgg = next.value;
-                        if (isLeftWindow(next)) {
-                            latestLeftTypeWindow = next.key.window();
-                        }
-                    } else if (endTime == timestamp) {
+                    if (endTime < inputRecordTimestamp) {
+                        leftWinAgg = windowBeingProcessed.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                    } else if (endTime == inputRecordTimestamp) {
                         leftWinAlreadyCreated = true;
-                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else if (endTime > timestamp && startTime <= timestamp) {
-                        rightWinAgg = next.value;
-                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else {
+                        if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                    } else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
+                        rightWinAgg = windowBeingProcessed.value;
+                        
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                    } else if (startTime == inputRecordTimestamp + 1) {
                         rightWinAlreadyCreated = true;
+                    } else {
+                        throw new IllegalStateException("Unexpected window 
found when processing sliding windows");

Review comment:
       nit: log an error and include the relevant info (eg `windowStart` and 
`inputRecordTimestamp` at least). Same for the IllegalStateException in 
`processEarly`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to