ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r485266653



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
##########
@@ -246,4 +238,31 @@ private void assertOutputKeyValueTimestamp(final 
TestOutputTopic<Windowed<String
         final TestRecord<String, String> testRecord = new 
TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp);
         assertThat(nonWindowedRecord, equalTo(testRecord));
     }
+
+    private void assertOutputKeyValueNotOrdered(final Set<TestRecord<String, 
String>> results) {

Review comment:
       This makes it sound like you want to assert that the output is not 
ordered, which I don't think is the point here?
   
   Also, since you're only calling this from one place and are asserting a 
specific output that corresponds to a specific test, I would just inline this 
check in the test instead of moving it out to a new method

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator 
=

Review comment:
       Why this change?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record

Review comment:
       Yeah, that's definitely a pretty long list of input parameters. I don't 
think that's necessarily a problem, but if you feel it's cleaner to just inline 
the window creation, then go for it. 
   
   Duplicate code is not the end of the world. The only risk is that we might 
need to change the window creation logic and only do it in one place but not 
the other, but that's probably a low risk here. So just make a call 🙂 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Is this change just sorting the output by window start time? Why do that 
vs. verifying that the output is in a specific order? In general the output 
order may not matter much but it does seem important to verify for suppress (I 
think 🤷‍♀️ )

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -161,60 +205,231 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
                     windowStartTimes.add(next.key.window().start());
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
 
                     if (endTime < timestamp) {
                         leftWinAgg = next.value;
-                        if (isLeftWindow(next)) {
-                            latestLeftTypeWindow = next.key.window();
-                        }
+                        // update to store the previous record
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
                     } else if (endTime == timestamp) {
                         leftWinAlreadyCreated = true;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
                         putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
                     } else if (endTime > timestamp && startTime <= timestamp) {
                         rightWinAgg = next.value;
                         putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    } else {
+                        throw new IllegalStateException("Unexpected window 
found when processing sliding windows");
+                    }
+                }
+            }
+            createWindows(key, value, timestamp, closeTime, windowStartTimes, 
rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, 
previousRecordTimestamp);
+        }
+
+        public void processReverse(final K key,
+                                   final V value,
+                                   final long timestamp,
+                                   final long closeTime,
+                                   final Set<Long> windowStartTimes,
+                                   ValueAndTimestamp<Agg> leftWinAgg,
+                                   ValueAndTimestamp<Agg> rightWinAgg,
+                                   boolean leftWinAlreadyCreated,
+                                   boolean rightWinAlreadyCreated,
+                                   Long previousRecordTimestamp) {
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+                    if (startTime == timestamp + 1) {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    } else if (endTime > timestamp) {
+                        if (rightWinAgg == null) {
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        leftWinAlreadyCreated = true;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        } else {
+                            return;
+                        }
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
                     } else {
+                        throw new IllegalStateException("Unexpected window 
found when processing sliding windows");
+                    }
+                }
+            }
+            createWindows(key, value, timestamp, closeTime, windowStartTimes, 
rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, 
previousRecordTimestamp);
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+                            // previous record's right window would have been 
created already by other records. This
+                            // will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
                         rightWinAlreadyCreated = true;
                     }
                 }
             }
 
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous 
record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                createPreviousRecordRightWindow(previousRecordTimestamp + 1, 
timestamp, key, value, closeTime);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
+            }
+
+        }
+
+        private void createWindows(final K key,
+                                   final V value,
+                                   final long timestamp,

Review comment:
       nit: rename to `inputRecordTimestamp` to be consistent with the naming 
in the early records PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to