ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r475985073
########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ########## @@ -328,6 +328,68 @@ public void testAggregateLargeInput() { ); } + @Test + public void testEarlyRecords() { Review comment: Can we add maybe one or two more tests? I think at the least we should have one test that processes _only_ early records, and one test that covers input(s) with the same timestamp. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean rightWinAlreadyCreated = false; // keep the left type window closest to the record - Window latestLeftTypeWindow = null; + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> latestLeftTypeWindow = null; try ( final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( Review comment: We need to make sure the `fetch` bounds don't go into the negative. We only call `processEarly` if the record's timestamp is within the timeDifferenceMs, but here we search starting at timestamp - 2*timeDifferenceMs ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final HashSet<Long> windowStartTimes = new HashSet<Long>(); + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (endTime == windows.timeDifferenceMs()) { + combinedWindow = next; + } else if (endTime > timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else { Review comment: It took me a second to get this -- can we explicitly check `if startTime == timestamp + 1` instead of falling back to `else` and implicitly relying on the fetch bounds? You can just get rid of the `else` altogether or throw an IllegalStateException if none of the specific conditions are met and the else is reached, whatever makes sense to you ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final HashSet<Long> windowStartTimes = new HashSet<Long>(); + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (endTime == windows.timeDifferenceMs()) { + combinedWindow = next; + } else if (endTime > timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else { + rightWinAlreadyCreated = true; + } + } + } + + if (combinedWindow == null) { + final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + + } else { + //create the right window for the most recent max timestamp in the combined window Review comment: > most recent max timestamp Huh? I think I know what you're trying to say here but it seems like two different phrases got a bit mixed up here ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final HashSet<Long> windowStartTimes = new HashSet<Long>(); + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (endTime == windows.timeDifferenceMs()) { Review comment: nit: can we make this `if startTime == 0` ? That seems slightly easier to understand, and then all the conditionals can be in terms of startTime which is a bit more intuitive since that's what we're iterating over. Context switching between startTime and endTime kind of makes me lose my train of thought ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -183,7 +188,8 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create right window for previous record if (latestLeftTypeWindow != null) { - final long rightWinStart = latestLeftTypeWindow.end() + 1; + final long leftWindowEnd = latestLeftTypeWindow.key.window().end(); Review comment: This is really just the timestamp of the previous record, right? Can we call it something that reflects that ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -211,6 +217,67 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final HashSet<Long> windowStartTimes = new HashSet<Long>(); + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), Review comment: I think this fetch might break if you go into the negatives, should just fetch starting from 0 ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; Review comment: This is just a window from [0, timeDifferenceMs] that stores the aggregation of all the "early" records, right? I can't really think of a more descriptive name so can we just leave a comment explaining what it's for ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final HashSet<Long> windowStartTimes = new HashSet<Long>(); + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (endTime == windows.timeDifferenceMs()) { + combinedWindow = next; + } else if (endTime > timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else { + rightWinAlreadyCreated = true; + } + } + } + + if (combinedWindow == null) { + final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + + } else { + //create the right window for the most recent max timestamp in the combined window + final long rightWinStart = combinedWindow.value.timestamp() + 1; + if (!windowStartTimes.contains(rightWinStart) && combinedWindow.value.timestamp() < timestamp) { + final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + //update the combined window with the new aggregated Review comment: nit: `aggregated` -> `aggregate` ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final HashSet<Long> windowStartTimes = new HashSet<Long>(); + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (endTime == windows.timeDifferenceMs()) { + combinedWindow = next; + } else if (endTime > timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else { + rightWinAlreadyCreated = true; + } + } + } + + if (combinedWindow == null) { + final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + + } else { + //create the right window for the most recent max timestamp in the combined window + final long rightWinStart = combinedWindow.value.timestamp() + 1; Review comment: This name keeps throwing me off...right window of what? Is it like `previousRecordRightWindow`? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final HashSet<Long> windowStartTimes = new HashSet<Long>(); + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + timestamp - 2 * windows.timeDifferenceMs(), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (endTime == windows.timeDifferenceMs()) { + combinedWindow = next; + } else if (endTime > timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else { + rightWinAlreadyCreated = true; + } + } + } + + if (combinedWindow == null) { + final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + + } else { + //create the right window for the most recent max timestamp in the combined window + final long rightWinStart = combinedWindow.value.timestamp() + 1; + if (!windowStartTimes.contains(rightWinStart) && combinedWindow.value.timestamp() < timestamp) { Review comment: It's not immediately obvious why this is correct/captures all possible cases so we should leave a comment, or better yet factor out this condition into a descriptively named method (or best yet, do both) I was concerned about out-of-order records, since in that case the previous record would obviously not be the one with the maximum timestamp in the combined window. But I realized that we actually never need to create a previous record's right window for out-of-order early records, since there's no way for a full timeDifferenceMs to fit between the previous record and whatever the max record in the combined window is. So, all we need to do is make sure we only try to create the previous record's right window if the current window is the maximum record within the combined window, ie the `combinedWindow.value.timestamp() < timestamp` But that's a very long and pretty ineloquent comment to leave in the code. Hopefully you can come up with a more concise explanation ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org