ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481338030



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -164,11 +161,13 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
                     if (endTime < timestamp) {
                         leftWinAgg = next.value;
-                        if (isLeftWindow(next)) {
-                            latestLeftTypeWindow = next.key.window();
-                        }
+                        // update to store the previous record

Review comment:
       This comment doesn't really add anything, it just describes what the 
code says. Also, don't we need to check that   `windowMaxTimestamp > 
previousRecordTimestamp` before updating `previousRecordTimestamp` (where 
`windowMaxTimestamp = next.value.timestamp` -- it would be nice to assign this 
to a variable with an explicit name to make it clear what 
`next.value.timestamp` actually means).
   Same goes for the below, I guess you could just put the check in a 
`maybeUpdatePreviousRecordTimestamp()` method and call it from both places

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -146,13 +142,14 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             boolean leftWinAlreadyCreated = false;
             boolean rightWinAlreadyCreated = false;
 
-            // keep the left type window closest to the record
-            Window latestLeftTypeWindow = null;
+            // Store the previous record
+            Long previousRecord = null;

Review comment:
       Use `previousRecordTimestamp` like in `processEarly`.  You can probably 
remove the comment then

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records

Review comment:
       ```suggestion
               // A window from [0, timeDifferenceMs] that holds all early 
records
   ```
   Also I'd suggest putting the `combinedWindow` declaration (and comment) 
above `rightWinAgg` to avoid ambiguity in what the comment refers to 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later

Review comment:
       update, or create? (or both?)

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,6 +239,7 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
+

Review comment:
       nit: remove this added line

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]

Review comment:
       ```suggestion
            * windows with negative start times, which is not supported. 
Instead, they will fall within the [0, timeDifferenceMs]
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (next.value.timestamp() < timestamp) {
+                            previousRecordTimestamp = next.value.timestamp();
+                        }

Review comment:
       I think we should always assign the `next.value.timestamp` value to a 
variable with an explicit name, eg `windowMaxRecordTimestamp`, because it's 
pretty non-obvious what it means and easy to forget

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (next.value.timestamp() < timestamp) {
+                            previousRecordTimestamp = next.value.timestamp();
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);

Review comment:
       I think we should add a comment somewhere clarifying some things about 
how we set the `previousRecordTimestamp` in `processEarly`:
   Basically, we only need to check and maybe set it when we're on the combined 
window, because if it's still null when we're past the combined window then we 
know there was a record greater than the current record in the combined window 
already, and in that case we must have already created the right window for the 
actual previous record. Hopefully you can find a better & more concise way to 
explain that 😄 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create

Review comment:
       ```suggestion
            * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -135,7 +127,11 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             observedStreamTime = Math.max(observedStreamTime, timestamp);
             final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
 
-            //store start times of windows we find
+            if (timestamp < windows.timeDifferenceMs()) {
+                processEarly(key, value, timestamp, closeTime);
+                return;

Review comment:
       Seems like we should move this above into the top-level `process` 
instead of first calling `processInOrder` and then calling `processEarly`.  For 
one thing, since we actually do need to iterate the full range for the early 
records, we can just call `processEarly` without having to decide between 
`processInOrder` and `processReverse`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);

Review comment:
       This doesn't look right..why would we need to pass in the `key` and 
`value` to `createRightWindow` ? The distinguishing feature of the current 
record's right window is that it doesn't  include the current record at all. I 
see that `createRightWindow` ultimately calls `putAndForward` which takes a key 
and value, but that just seems misleading. I think we should either pass in 
`null` to `putAndForward` for things we don't need, or better yet (imo) don't 
use `putAndForward` for the right window creation and just have a clean 
separation between creation of the right window and everything else

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (next.value.timestamp() < timestamp) {
+                            previousRecordTimestamp = next.value.timestamp();
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {

Review comment:
       Just leaving a note to the next reviewer, and making sure I understand 
this myself: 
   If we didn't find a right window agg that either means 
   1) there were no other windows with startTime <= timestamp, which means 
there are no records earlier than the current one, and no records later than 
timeDifference (but within range of the current record). We take that to mean 
that whatever is in the combined window is an aggregate of records that are all 
to the right of the current record. Note that there could be a record at the 
same timestamp as the current record, but we will check for that before 
actually creating the right window below (by checking `!rightWinAlreadyCreated`)
   2) There is just a single record in the combined window that is earlier than 
the current record. We check for that with the 
`combinedWindow.value.timestamp() > timestamp` condition and in that case 
create no right window
   
   By the way, can we move the creation of the right window to just after this 
block, to keep all the relevant logic together?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (next.value.timestamp() < timestamp) {
+                            previousRecordTimestamp = next.value.timestamp();
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //create the right window for the previous record if the 
previous record exists and the window hasn't already been created
+                if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                    final TimeWindow window = new 
TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + 
windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
+            }
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
             }
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createRightWindow(final long timestamp,
+                                       final ValueAndTimestamp<Agg> 
rightWinAgg,
+                                       final K key,
+                                       final V value,
+                                       final long closeTime) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp 
+ 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+            putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);

Review comment:
       It's pretty weird to have to pass in `key` and `value` when they're not 
used to create the right window. I notice they're used in the log message for 
dropped late windows actually, but it seems odd that we should ever end up 
dropping the right window of the current record. If the record itself is that 
old, we should just drop it before even processing it, right? 
   Assuming you do that, then it feels a lot more reasonable to not call 
`putAndForward` from `createRightWindow` at all, and just do the actual putting 
and forwarding for the right window case inline. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (next.value.timestamp() < timestamp) {
+                            previousRecordTimestamp = next.value.timestamp();
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //create the right window for the previous record if the 
previous record exists and the window hasn't already been created
+                if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                    final TimeWindow window = new 
TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + 
windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
+            }
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
             }
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createRightWindow(final long timestamp,
+                                       final ValueAndTimestamp<Agg> 
rightWinAgg,
+                                       final K key,
+                                       final V value,
+                                       final long closeTime) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp 
+ 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+            putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+        }
+
+        private boolean leftWindowNotEmpty(final long previousTimestamp, final 
long currentTimestamp) {
+            return currentTimestamp - windows.timeDifferenceMs() <= 
previousTimestamp;
         }
 
-        private boolean isLeftWindow(final KeyValue<Windowed<K>, 
ValueAndTimestamp<Agg>> window) {
-            return window.key.window().end() == window.value.timestamp();
+        // previous record's right window does not already exist and current 
record falls within previous record's right window
+        private boolean rightWindowNecessaryAndPossible(final Set<Long> 
windowStartTimes,
+                                                        final long 
previousRightWindowStart,
+                                                        final long 
currentRecordTimestamp) {
+            return !windowStartTimes.contains(previousRightWindowStart) && 
previousRightWindowStart + windows.timeDifferenceMs() >= currentRecordTimestamp;
+        }

Review comment:
       This isn't used anymore, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to