lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485053442
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value,
final long timestamp) {
//create left window for new record
if (!leftWinAlreadyCreated) {
final ValueAndTimestamp<Agg> valueAndTime;
- //there's a right window that the new record could create -->
new record's left window is not empty
- if (latestLeftTypeWindow != null) {
+ // if there's a right window that the new record could create
&& previous record falls within left window -> new record's left window is not
empty
+ if (previousRecordTimestamp != null &&
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(),
timestamp);
} else {
valueAndTime = ValueAndTimestamp.make(initializer.apply(),
timestamp);
}
final TimeWindow window = new TimeWindow(timestamp -
windows.timeDifferenceMs(), timestamp);
putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
}
- //create right window for new record
if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg,
timestamp)) {
- final TimeWindow window = new TimeWindow(timestamp + 1,
timestamp + 1 + windows.timeDifferenceMs());
- final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(getValueOrNull(rightWinAgg),
Math.max(rightWinAgg.timestamp(), timestamp));
+ createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+ }
+ }
+
+ /**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs.
These records would create
+ * windows with negative start times, which is not supported. Instead,
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new
records come in later
+ */
+ private void processEarly(final K key, final V value, final long
timestamp, final long closeTime) {
+ // A window from [0, timeDifferenceMs] that holds all early records
+ KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow =
null;
+ ValueAndTimestamp<Agg> rightWinAgg = null;
+ boolean rightWinAlreadyCreated = false;
+ final Set<Long> windowStartTimes = new HashSet<>();
+
+ Long previousRecordTimestamp = null;
+
+ try (
+ final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>>
iterator = windowStore.fetch(
+ key,
+ key,
+ Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+ // to catch the current record's right window, if it
exists, without more calls to the store
+ timestamp + 1)
+ ) {
+ KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+ while (iterator.hasNext()) {
+ next = iterator.next();
+ windowStartTimes.add(next.key.window().start());
+ final long startTime = next.key.window().start();
+ final long windowMaxRecordTimestamp =
next.value.timestamp();
+
+ if (startTime == 0) {
+ combinedWindow = next;
+ if (windowMaxRecordTimestamp < timestamp) {
+ // If maxRecordTimestamp > timestamp, the current
record is out-of-order, meaning that the
+ // previous record's right window would have been
created already by other records. This
+ // will always be true for early records, as they
all fall within [0, timeDifferenceMs].
+ previousRecordTimestamp = windowMaxRecordTimestamp;
+ }
+
+ } else if (startTime <= timestamp) {
+ rightWinAgg = next.value;
+ putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
+ } else if (startTime == timestamp + 1) {
+ rightWinAlreadyCreated = true;
+ }
+ }
+ }
+
+ // if there wasn't a right window agg found and we need a right
window for our new record,
+ // the current aggregate in the combined window will go in the new
record's right window
+ if (rightWinAgg == null && combinedWindow != null &&
combinedWindow.value.timestamp() > timestamp) {
+ rightWinAgg = combinedWindow.value;
+ }
+
+ //create right window for new record if needed
+ if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg,
timestamp)) {
+ createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+ }
+
+ //create the right window for the previous record if the previous
record exists and the window hasn't already been created
+ if (previousRecordTimestamp != null &&
!windowStartTimes.contains(previousRecordTimestamp + 1)) {
+ final TimeWindow window = new
TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 +
windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(initializer.apply(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+
+ if (combinedWindow == null) {
+ final TimeWindow window = new TimeWindow(0,
windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(initializer.apply(), timestamp);
putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+
+ } else {
+ //update the combined window with the new aggregate
+ putAndForward(combinedWindow.key.window(),
combinedWindow.value, key, value, closeTime, timestamp);
}
+
}
- private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg>
rightWinAgg, final long timestamp) {
- return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+ private void createCurrentRecordRightWindow(final long timestamp,
+ final
ValueAndTimestamp<Agg> rightWinAgg,
+ final K key) {
+ final TimeWindow window = new TimeWindow(timestamp + 1, timestamp
+ 1 + windows.timeDifferenceMs());
+ windowStore.put(
+ key,
+ rightWinAgg,
+ window.start());
+ tupleForwarder.maybeForward(
+ new Windowed<>(key, window),
+ rightWinAgg.value(),
+ null,
+ rightWinAgg.timestamp());
+ }
+
+ private boolean leftWindowNotEmpty(final long previousTimestamp, final
long currentTimestamp) {
+ return currentTimestamp - windows.timeDifferenceMs() <=
previousTimestamp;
+ }
+
+ // previous record's right window does not already exist and current
record falls within previous record's right window
+ private boolean rightWindowNecessaryAndPossible(final Set<Long>
windowStartTimes,
+ final long
previousRightWindowStart,
+ final long
currentRecordTimestamp) {
+ return !windowStartTimes.contains(previousRightWindowStart) &&
previousRightWindowStart + windows.timeDifferenceMs() >= currentRecordTimestamp;
}
- private boolean isLeftWindow(final KeyValue<Windowed<K>,
ValueAndTimestamp<Agg>> window) {
- return window.key.window().end() == window.value.timestamp();
+ private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg>
rightWinAgg, final long timestamp) {
+ return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
Review comment:
I think the catch is that it's only _sometimes_ the aggregate that goes
in the current record's right window. Sometimes we don't use the value in
`rightWingAgg` because the right window is empty/doesn't exist, but if there
_is_ a right window aggregate, it will be in `rightWinAgg`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]