ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483835732



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new 
records come in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+                            // previous record's right window would have been 
created already by other records. This
+                            // will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
       > the statement `if (windowMaxRecordTimestamp < timestamp) { 
previousRecordTimestamp = windowMaxRecordTimestamp; }`is somewhat self 
explanatory
   
   I think that's fair. My concern was with the `windowMaxRecordTimestamp > 
timestamp` case -- in that situation, we don't know and can't know what the 
`previousRecordTimestamp` is, because all we save is the maxTimestamp of the 
combined window and therefore the information is lost. I just thought we should 
clarify that this is actually ok, because if `windowMaxRecordTimestamp > 
timestamp` then we must have already created the right window of the previous 
record. So I agree that the `!windowStartTimes.contains(previousRecordTimestamp 
+ 1)` check would logically catch this, but I don't think we can remove either 
check: 
   
   If we remove the `if (windowMaxRecordTimestamp < timestamp) { 
previousRecordTimestamp = windowMaxRecordTimestamp; }` then we might set 
`previousRecordTimestamp` to a value that isn't actually the timestamp of a 
previous record, and we should make sure all variables are always accurate
   
   If we remove the `!windowStartTimes.contains(previousRecordTimestamp + 1)` 
then we could miss a case where the previous record's right window was already 
created, but by a record out side of the combined window
   
   So, I think the code itself is complete as is. But probably we can simplify 
the comments to make it more understandable -- it seems reasonable to remove 
the comment here entirely. Then maybe you could leave a brief comment down 
below where we create the previous record's right window saying "if the 
`previousRecordTimestamp` is null, either there is no previous record or it's 
right window has already been created"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to