ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r481338030
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -164,11 +161,13 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; - if (isLeftWindow(next)) { - latestLeftTypeWindow = next.key.window(); - } + // update to store the previous record Review comment: This comment doesn't really add anything, it just describes what the code says. Also, don't we need to check that `windowMaxTimestamp > previousRecordTimestamp` before updating `previousRecordTimestamp` (where `windowMaxTimestamp = next.value.timestamp` -- it would be nice to assign this to a variable with an explicit name to make it clear what `next.value.timestamp` actually means). Same goes for the below, I guess you could just put the check in a `maybeUpdatePreviousRecordTimestamp()` method and call it from both places ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -146,13 +142,14 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean leftWinAlreadyCreated = false; boolean rightWinAlreadyCreated = false; - // keep the left type window closest to the record - Window latestLeftTypeWindow = null; + // Store the previous record + Long previousRecord = null; Review comment: Use `previousRecordTimestamp` like in `processEarly`. You can probably remove the comment then ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + //window from [0,timeDifference] that holds all early records Review comment: ```suggestion // A window from [0, timeDifferenceMs] that holds all early records ``` Also I'd suggest putting the `combinedWindow` declaration (and comment) above `rightWinAgg` to avoid ambiguity in what the comment refers to ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later Review comment: update, or create? (or both?) ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ########## @@ -239,6 +239,7 @@ private void doCountSlidingWindows(final MockProcessorSupplier<Windowed<String> inputTopic.pipeInput("2", "B", 1000L); inputTopic.pipeInput("3", "C", 600L); } + Review comment: nit: remove this added line ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] Review comment: ```suggestion * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + //window from [0,timeDifference] that holds all early records + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final Set<Long> windowStartTimes = new HashSet<>(); + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + + if (startTime == 0) { + combinedWindow = next; + if (next.value.timestamp() < timestamp) { + previousRecordTimestamp = next.value.timestamp(); + } Review comment: I think we should always assign the `next.value.timestamp` value to a variable with an explicit name, eg `windowMaxRecordTimestamp`, because it's pretty non-obvious what it means and easy to forget ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + //window from [0,timeDifference] that holds all early records + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final Set<Long> windowStartTimes = new HashSet<>(); + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + + if (startTime == 0) { + combinedWindow = next; + if (next.value.timestamp() < timestamp) { + previousRecordTimestamp = next.value.timestamp(); + } + + } else if (startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); Review comment: I think we should add a comment somewhere clarifying some things about how we set the `previousRecordTimestamp` in `processEarly`: Basically, we only need to check and maybe set it when we're on the combined window, because if it's still null when we're past the combined window then we know there was a record greater than the current record in the combined window already, and in that case we must have already created the right window for the actual previous record. Hopefully you can find a better & more concise way to explain that 😄 ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create Review comment: ```suggestion * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -135,7 +127,11 @@ public void processInOrder(final K key, final V value, final long timestamp) { observedStreamTime = Math.max(observedStreamTime, timestamp); final long closeTime = observedStreamTime - windows.gracePeriodMs(); - //store start times of windows we find + if (timestamp < windows.timeDifferenceMs()) { + processEarly(key, value, timestamp, closeTime); + return; Review comment: Seems like we should move this above into the top-level `process` instead of first calling `processInOrder` and then calling `processEarly`. For one thing, since we actually do need to iterate the full range for the early records, we can just call `processEarly` without having to decide between `processInOrder` and `processReverse` ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); Review comment: This doesn't look right..why would we need to pass in the `key` and `value` to `createRightWindow` ? The distinguishing feature of the current record's right window is that it doesn't include the current record at all. I see that `createRightWindow` ultimately calls `putAndForward` which takes a key and value, but that just seems misleading. I think we should either pass in `null` to `putAndForward` for things we don't need, or better yet (imo) don't use `putAndForward` for the right window creation and just have a clean separation between creation of the right window and everything else ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + //window from [0,timeDifference] that holds all early records + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final Set<Long> windowStartTimes = new HashSet<>(); + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + + if (startTime == 0) { + combinedWindow = next; + if (next.value.timestamp() < timestamp) { + previousRecordTimestamp = next.value.timestamp(); + } + + } else if (startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (startTime == timestamp + 1) { + rightWinAlreadyCreated = true; + } + } + } + + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window will go in the new record's right window + if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { Review comment: Just leaving a note to the next reviewer, and making sure I understand this myself: If we didn't find a right window agg that either means 1) there were no other windows with startTime <= timestamp, which means there are no records earlier than the current one, and no records later than timeDifference (but within range of the current record). We take that to mean that whatever is in the combined window is an aggregate of records that are all to the right of the current record. Note that there could be a record at the same timestamp as the current record, but we will check for that before actually creating the right window below (by checking `!rightWinAlreadyCreated`) 2) There is just a single record in the combined window that is earlier than the current record. We check for that with the `combinedWindow.value.timestamp() > timestamp` condition and in that case create no right window By the way, can we move the creation of the right window to just after this block, to keep all the relevant logic together? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + //window from [0,timeDifference] that holds all early records + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final Set<Long> windowStartTimes = new HashSet<>(); + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + + if (startTime == 0) { + combinedWindow = next; + if (next.value.timestamp() < timestamp) { + previousRecordTimestamp = next.value.timestamp(); + } + + } else if (startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (startTime == timestamp + 1) { + rightWinAlreadyCreated = true; + } + } + } + + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window will go in the new record's right window + if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { + rightWinAgg = combinedWindow.value; + } + + if (combinedWindow == null) { + final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + + } else { + //create the right window for the previous record if the previous record exists and the window hasn't already been created + if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) { + final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + //update the combined window with the new aggregate + putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp); + } + //create right window for new record if needed + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); } } - private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) { - return rightWinAgg != null && rightWinAgg.timestamp() > timestamp; + private void createRightWindow(final long timestamp, + final ValueAndTimestamp<Agg> rightWinAgg, + final K key, + final V value, + final long closeTime) { + final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); Review comment: It's pretty weird to have to pass in `key` and `value` when they're not used to create the right window. I notice they're used in the log message for dropped late windows actually, but it seems odd that we should ever end up dropping the right window of the current record. If the record itself is that old, we should just drop it before even processing it, right? Assuming you do that, then it feels a lot more reasonable to not call `putAndForward` from `createRightWindow` at all, and just do the actual putting and forwarding for the right window case inline. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty + if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } - //create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { - final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); - final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + //window from [0,timeDifference] that holds all early records + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final Set<Long> windowStartTimes = new HashSet<>(); + + Long previousRecordTimestamp = null; + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + + if (startTime == 0) { + combinedWindow = next; + if (next.value.timestamp() < timestamp) { + previousRecordTimestamp = next.value.timestamp(); + } + + } else if (startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (startTime == timestamp + 1) { + rightWinAlreadyCreated = true; + } + } + } + + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window will go in the new record's right window + if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { + rightWinAgg = combinedWindow.value; + } + + if (combinedWindow == null) { + final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + + } else { + //create the right window for the previous record if the previous record exists and the window hasn't already been created + if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) { + final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + //update the combined window with the new aggregate + putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp); + } + //create right window for new record if needed + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + createRightWindow(timestamp, rightWinAgg, key, value, closeTime); } } - private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) { - return rightWinAgg != null && rightWinAgg.timestamp() > timestamp; + private void createRightWindow(final long timestamp, + final ValueAndTimestamp<Agg> rightWinAgg, + final K key, + final V value, + final long closeTime) { + final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + + private boolean leftWindowNotEmpty(final long previousTimestamp, final long currentTimestamp) { + return currentTimestamp - windows.timeDifferenceMs() <= previousTimestamp; } - private boolean isLeftWindow(final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> window) { - return window.key.window().end() == window.value.timestamp(); + // previous record's right window does not already exist and current record falls within previous record's right window + private boolean rightWindowNecessaryAndPossible(final Set<Long> windowStartTimes, + final long previousRightWindowStart, + final long currentRecordTimestamp) { + return !windowStartTimes.contains(previousRightWindowStart) && previousRightWindowStart + windows.timeDifferenceMs() >= currentRecordTimestamp; + } Review comment: This isn't used anymore, right? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org