ableegoldman commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r485266653
########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java ########## @@ -246,4 +238,31 @@ private void assertOutputKeyValueTimestamp(final TestOutputTopic<Windowed<String final TestRecord<String, String> testRecord = new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp); assertThat(nonWindowedRecord, equalTo(testRecord)); } + + private void assertOutputKeyValueNotOrdered(final Set<TestRecord<String, String>> results) { Review comment: This makes it sound like you want to assert that the output is not ordered, which I don't think is the point here? Also, since you're only calling this from one place and are asserting a specific output that corresponds to a specific test, I would just inline this check in the test instead of moving it out to a new method ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ########## @@ -239,52 +241,81 @@ private void doCountSlidingWindows(final MockProcessorSupplier<Windowed<String> inputTopic.pipeInput("2", "B", 1000L); inputTopic.pipeInput("3", "C", 600L); } - assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList( - // processing A@500 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L), - // processing A@999 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L), - // processing A@600 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L), - // processing B@500 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L), - // processing B@600 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L), - // processing B@700 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L), - // processing C@501 - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L), - // processing first A@1000 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L), - // processing second A@1000 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L), - // processing first B@1000 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L), - // processing second B@1000 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L), - // processing C@600 - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L), - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L) + final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator = Review comment: Why this change? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) { } //create right window for previous record - if (latestLeftTypeWindow != null) { - final long rightWinStart = latestLeftTypeWindow.end() + 1; - if (!windowStartTimes.contains(rightWinStart)) { - final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); - putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + if (previousRecordTimestamp != null) { + final long previousRightWinStart = previousRecordTimestamp + 1; + if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) { + createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime); } } //create left window for new record if (!leftWinAlreadyCreated) { - final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { - valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); - } else { - valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime); + } + // create right window for new record, if necessary + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + createCurrentRecordRightWindow(timestamp, rightWinAgg, key); + } + } + + public void processReverse(final K key, final V value, final long timestamp, final long closeTime) { + + final Set<Long> windowStartTimes = new HashSet<>(); + // aggregate that will go in the current record’s left/right window (if needed) + ValueAndTimestamp<Agg> leftWinAgg = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + + //if current record's left/right windows already exist + boolean leftWinAlreadyCreated = false; + boolean rightWinAlreadyCreated = false; + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + //if we've already seen the window with the closest start time to the record + boolean foundRightWinAgg = false; + + while (iterator.hasNext()) { + final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + final long windowMaxRecordTimestamp = next.value.timestamp(); + + if (endTime > timestamp) { + if (!foundRightWinAgg) { + foundRightWinAgg = true; + rightWinAgg = next.value; + } + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (endTime == timestamp) { + if (windowMaxRecordTimestamp < timestamp) { + previousRecordTimestamp = windowMaxRecordTimestamp; + } + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + leftWinAlreadyCreated = true; + } else if (endTime < timestamp) { + leftWinAgg = next.value; + previousRecordTimestamp = windowMaxRecordTimestamp; + break; + } else { + //determine if current record's right window exists, will only be true at most once, on the first pass + rightWinAlreadyCreated = true; + } + } + } + + //create right window for previous record Review comment: Yeah, that's definitely a pretty long list of input parameters. I don't think that's necessarily a problem, but if you feel it's cleaner to just inline the window creation, then go for it. Duplicate code is not the end of the world. The only risk is that we might need to change the window creation logic and only do it in one place but not the other, but that's probably a low risk here. So just make a call 🙂 ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java ########## @@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() { inputTopic.pipeInput("k1", "v1", 7L); // final record to advance stream time and flush windows inputTopic.pipeInput("k1", "v1", 90L); + final Comparator<TestRecord<String, Long>> comparator = Review comment: Is this change just sorting the output by window start time? Why do that vs. verifying that the output is in a specific order? In general the output order may not matter much but it does seem important to verify for suppress (I think 🤷♀️ ) ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -161,60 +205,231 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); + final long windowMaxRecordTimestamp = next.value.timestamp(); if (endTime < timestamp) { leftWinAgg = next.value; - if (isLeftWindow(next)) { - latestLeftTypeWindow = next.key.window(); - } + // update to store the previous record + previousRecordTimestamp = windowMaxRecordTimestamp; } else if (endTime == timestamp) { leftWinAlreadyCreated = true; + if (windowMaxRecordTimestamp < timestamp) { + previousRecordTimestamp = windowMaxRecordTimestamp; + } putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); } else if (endTime > timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (endTime == timestamp + 1) { + rightWinAlreadyCreated = true; + } else { + throw new IllegalStateException("Unexpected window found when processing sliding windows"); + } + } + } + createWindows(key, value, timestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp); + } + + public void processReverse(final K key, + final V value, + final long timestamp, + final long closeTime, + final Set<Long> windowStartTimes, + ValueAndTimestamp<Agg> leftWinAgg, + ValueAndTimestamp<Agg> rightWinAgg, + boolean leftWinAlreadyCreated, + boolean rightWinAlreadyCreated, + Long previousRecordTimestamp) { + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + + while (iterator.hasNext()) { + final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + final long windowMaxRecordTimestamp = next.value.timestamp(); + if (startTime == timestamp + 1) { + //determine if current record's right window exists, will only be true at most once, on the first pass + rightWinAlreadyCreated = true; + } else if (endTime > timestamp) { + if (rightWinAgg == null) { + rightWinAgg = next.value; + } + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (endTime == timestamp) { + leftWinAlreadyCreated = true; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + if (windowMaxRecordTimestamp < timestamp) { + previousRecordTimestamp = windowMaxRecordTimestamp; + } else { + return; + } + } else if (endTime < timestamp) { + leftWinAgg = next.value; + previousRecordTimestamp = windowMaxRecordTimestamp; + break; } else { + throw new IllegalStateException("Unexpected window found when processing sliding windows"); + } + } + } + createWindows(key, value, timestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp); + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + //window from [0,timeDifference] that holds all early records + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final Set<Long> windowStartTimes = new HashSet<>(); + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long windowMaxRecordTimestamp = next.value.timestamp(); + + if (startTime == 0) { + combinedWindow = next; + if (windowMaxRecordTimestamp < timestamp) { + // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the + // previous record's right window would have been created already by other records. This + // will always be true for early records, as they all fall within [0, timeDifferenceMs]. + previousRecordTimestamp = windowMaxRecordTimestamp; + } + + } else if (startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (startTime == timestamp + 1) { rightWinAlreadyCreated = true; } } } + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window will go in the new record's right window + if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { + rightWinAgg = combinedWindow.value; + } + + //create right window for new record if needed + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + createCurrentRecordRightWindow(timestamp, rightWinAgg, key); + } + + //create the right window for the previous record if the previous record exists and the window hasn't already been created + if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) { + createPreviousRecordRightWindow(previousRecordTimestamp + 1, timestamp, key, value, closeTime); + } + + if (combinedWindow == null) { + final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + + } else { + //update the combined window with the new aggregate + putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp); + } + + } + + private void createWindows(final K key, + final V value, + final long timestamp, Review comment: nit: rename to `inputRecordTimestamp` to be consistent with the naming in the early records PR ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org