ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r485927114



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -150,23 +152,46 @@ public void process(final K key, final V value) {
                 return;
             }
 
-            processInOrder(key, value, inputRecordTimestamp, closeTime);
-        }
-
-        public void processInOrder(final K key, final V value, final long 
inputRecordTimestamp, final long closeTime) {
+            if (reverseIteratorPossible == null) {
+                try {
+                    windowStore.backwardFetch(key, 0L, 0L);
+                    reverseIteratorPossible = true;
+                    log.debug("Sliding Windows aggregate using a reverse 
iterator");
+                } catch (final UnsupportedOperationException e)  {
+                    reverseIteratorPossible = false;
+                    log.debug("Sliding Windows aggregate using a forward 
iterator");
+                }
+            }
 
             final Set<Long> windowStartTimes = new HashSet<>();
 
             // aggregate that will go in the current record’s left/right 
window (if needed)
-            ValueAndTimestamp<Agg> leftWinAgg = null;
-            ValueAndTimestamp<Agg> rightWinAgg = null;
+            final ValueAndTimestamp<Agg> leftWinAgg = null;
+            final ValueAndTimestamp<Agg> rightWinAgg = null;
 
             //if current record's left/right windows already exist
-            boolean leftWinAlreadyCreated = false;
-            boolean rightWinAlreadyCreated = false;
+            final boolean leftWinAlreadyCreated = false;
+            final boolean rightWinAlreadyCreated = false;
 
-            Long previousRecordTimestamp = null;
+            final Long previousRecordTimestamp = null;

Review comment:
       Why create all of these here and then pass the uninitialized values into 
`processInOrder/processReverse`? If we only need them within the `processX` 
methods, let's just keep them there

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -184,6 +209,7 @@ public void processInOrder(final K key, final V value, 
final long inputRecordTim
 
                     if (endTime < inputRecordTimestamp) {
                         leftWinAgg = windowBeingProcessed.value;
+                        // update to store the previous record

Review comment:
       This comment doesn't really add much

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -205,38 +231,69 @@ public void processInOrder(final K key, final V value, 
final long inputRecordTim
                     }
                 }
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, 
windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, 
rightWinAlreadyCreated, previousRecordTimestamp);
+        }
 
-            //create right window for previous record
-            if (previousRecordTimestamp != null) {
-                final long previousRightWinStart = previousRecordTimestamp + 1;
-                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, inputRecordTimestamp)) {
-                    final TimeWindow window = new 
TimeWindow(previousRightWinStart, previousRightWinStart + 
windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                    updateWindowAndForward(window, valueAndTime, key, value, 
closeTime, inputRecordTimestamp);
-                }
-            }
-
-            //create left window for new record
-            if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
-                if (leftWindowNotEmpty(previousRecordTimestamp, 
inputRecordTimestamp)) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
inputRecordTimestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
inputRecordTimestamp);
+        public void processReverse(final K key,
+                                   final V value,
+                                   final long inputRecordTimestamp,
+                                   final long closeTime,
+                                   final Set<Long> windowStartTimes,
+                                   ValueAndTimestamp<Agg> leftWinAgg,
+                                   ValueAndTimestamp<Agg> rightWinAgg,
+                                   boolean leftWinAlreadyCreated,
+                                   boolean rightWinAlreadyCreated,
+                                   Long previousRecordTimestamp) {
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    inputRecordTimestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> 
windowBeingProcessed = iterator.next();
+                    final long startTime = 
windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
+                    if (startTime == inputRecordTimestamp + 1) {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    } else if (endTime > inputRecordTimestamp) {
+                        if (rightWinAgg == null) {
+                            rightWinAgg = windowBeingProcessed.value;
+                        }
+                        
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                    } else if (endTime == inputRecordTimestamp) {
+                        leftWinAlreadyCreated = true;
+                        
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                        if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        } else {
+                            return;
+                        }
+                    } else if (endTime < inputRecordTimestamp) {
+                        leftWinAgg = windowBeingProcessed.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        log.error(
+                            "Unexpected window with start {} found when 
processing record at {} in `KStreamSlidingWindowAggregate`.",
+                            startTime, inputRecordTimestamp
+                        );
+                        throw new IllegalStateException("Unexpected window 
found when processing sliding windows");
+                    }
                 }
-                final TimeWindow window = new TimeWindow(inputRecordTimestamp 
- windows.timeDifferenceMs(), inputRecordTimestamp);
-                updateWindowAndForward(window, valueAndTime, key, value, 
closeTime, inputRecordTimestamp);
-            }
-            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
inputRecordTimestamp)) {
-                createCurrentRecordRightWindow(inputRecordTimestamp, 
rightWinAgg, key);
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, 
windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, 
rightWinAlreadyCreated, previousRecordTimestamp);
         }
 
         /**
-         * Created to handle records where 0 < inputRecordTimestamp < 
timeDifferenceMs. These records would create
-         * windows with negative start times, which is not supported. Instead, 
we will put them into the [0, timeDifferenceMs]
-         * window as a "workaround", and we will update or create their right 
windows as new records come in later

Review comment:
       I think some things got accidentally changed/reverted during the rebase, 
eg this paragraph and the comment on line 250 at least

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -194,7 +220,7 @@ public void processInOrder(final K key, final V value, 
final long inputRecordTim
                     } else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
                         rightWinAgg = windowBeingProcessed.value;
                         
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
-                    } else if (startTime == inputRecordTimestamp + 1) {

Review comment:
       Was this a rebasing accident? Seems like using `startTime` is correct

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -327,6 +382,41 @@ private void processEarly(final K key, final V value, 
final long inputRecordTime
 
         }
 
+        private void createWindows(final K key,
+                                   final V value,
+                                   final long inputRecordTimestamp,
+                                   final long closeTime,
+                                   final Set<Long> windowStartTimes,
+                                   final ValueAndTimestamp<Agg> rightWinAgg,
+                                   final ValueAndTimestamp<Agg> leftWinAgg,
+                                   final boolean leftWinAlreadyCreated,
+                                   final boolean rightWinAlreadyCreated,
+                                   final Long previousRecordTimestamp) {
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if 
(previousRecordRightWindowDoesNotExistAndIsNotEmpty(windowStartTimes, 
previousRightWinStart, inputRecordTimestamp)) {
+                    createPreviousRecordRightWindow(previousRightWinStart, 
inputRecordTimestamp, key, value, closeTime);
+                }
+            }
+
+            //create left window for new record
+            if (!leftWinAlreadyCreated) {
+                final ValueAndTimestamp<Agg> valueAndTime;
+                if (leftWindowNotEmpty(previousRecordTimestamp, 
inputRecordTimestamp)) {
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
inputRecordTimestamp);
+                } else {
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
inputRecordTimestamp);
+                }
+                final TimeWindow window = new TimeWindow(inputRecordTimestamp 
- windows.timeDifferenceMs(), inputRecordTimestamp);
+                updateWindowAndForward(window, valueAndTime, key, value, 
closeTime, inputRecordTimestamp);
+            }

Review comment:
       super nit: put a line break before the right window creation block below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to