ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481460413



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +190,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new 
records come in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+                            // previous record's right window would have been 
created already by other records. This
+                            // will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create the right window for the previous record if the previous 
record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                final TimeWindow window = new 
TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
+            }
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);

Review comment:
       Sorry, I think my previous comment was a bit unclear. I meant that we 
should move this block up to where we set the `rightWinAgg` right after the 
loop, since this is what we actually use the `rightWinAgg` for.
   I think the "create right window for previous record" logic is fine wherever

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -254,35 +257,43 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
                 rightWinAgg = combinedWindow.value;
             }
 
+            //create the right window for the previous record if the previous 
record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                final TimeWindow window = new 
TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+
             if (combinedWindow == null) {
                 final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
                 final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 
             } else {
-                //create the right window for the previous record if the 
previous record exists and the window hasn't already been created
-                if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
-                    final TimeWindow window = new 
TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + 
windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
-                }
                 //update the combined window with the new aggregate
                 putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
             }
             //create right window for new record if needed
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
             }
         }
 
-        private void createRightWindow(final long timestamp,
-                                       final ValueAndTimestamp<Agg> 
rightWinAgg,
-                                       final K key,
-                                       final V value,
-                                       final long closeTime) {
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final 
ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
             final TimeWindow window = new TimeWindow(timestamp + 1, timestamp 
+ 1 + windows.timeDifferenceMs());
-            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
-            putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), rightWinAgg.timestamp());

Review comment:
       Do we need to use `getValueOrNull` here? It seems like `rightWinAgg` 
should never be null if we are using it to create a right window

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -311,13 +322,8 @@ private void putAndForward(final Window window,
             if (windowEnd > closeTime) {
                 //get aggregate from existing window
                 final Agg oldAgg = getValueOrNull(valueAndTime);

Review comment:
       Same here, can this ever be null? It doesn't seem like it, even if it's 
a new window we still pass in the initializer value so `valueAndTime` won't 
ever be null. ...right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (next.value.timestamp() < timestamp) {
+                            previousRecordTimestamp = next.value.timestamp();
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //create the right window for the previous record if the 
previous record exists and the window hasn't already been created
+                if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                    final TimeWindow window = new 
TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + 
windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
+            }
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
             }
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createRightWindow(final long timestamp,
+                                       final ValueAndTimestamp<Agg> 
rightWinAgg,
+                                       final K key,
+                                       final V value,
+                                       final long closeTime) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp 
+ 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+            putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);

Review comment:
       Oh good point, we definitely need the key. But I think separating them 
turned out well 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (next.value.timestamp() < timestamp) {
+                            previousRecordTimestamp = next.value.timestamp();
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //create the right window for the previous record if the 
previous record exists and the window hasn't already been created
+                if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                    final TimeWindow window = new 
TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + 
windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
+            }
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
             }
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createRightWindow(final long timestamp,
+                                       final ValueAndTimestamp<Agg> 
rightWinAgg,
+                                       final K key,
+                                       final V value,
+                                       final long closeTime) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp 
+ 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+            putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+        }
+
+        private boolean leftWindowNotEmpty(final long previousTimestamp, final 
long currentTimestamp) {
+            return currentTimestamp - windows.timeDifferenceMs() <= 
previousTimestamp;
         }
 
-        private boolean isLeftWindow(final KeyValue<Windowed<K>, 
ValueAndTimestamp<Agg>> window) {
-            return window.key.window().end() == window.value.timestamp();
+        // previous record's right window does not already exist and current 
record falls within previous record's right window
+        private boolean rightWindowNecessaryAndPossible(final Set<Long> 
windowStartTimes,
+                                                        final long 
previousRightWindowStart,
+                                                        final long 
currentRecordTimestamp) {
+            return !windowStartTimes.contains(previousRightWindowStart) && 
previousRightWindowStart + windows.timeDifferenceMs() >= currentRecordTimestamp;
+        }

Review comment:
       I think you mean  `processInOrder`, not  `processEarly` (this is correct 
in the code, it's just the latest comment that doesn't match up)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to