lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483718480



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (next.value.timestamp() < timestamp) {
+                            previousRecordTimestamp = next.value.timestamp();
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {

Review comment:
       Updated the comment in an attempt to do a mini-proof:
   `If there wasn't a right window agg found and we need a right window for our 
new record, the current aggregate in the combined window will go in the new 
record's right window. We can be sure that the combined window only holds 
records that fall into the current record's right window for two reasons:
   1. If there were records earlier than the current record AND later than the 
current record, there would be a right window found when we looked for right 
window agg. 
   2. If there was only a record before the current record, we wouldn't need a 
right window for the current record and wouldn't update the rightWinAgg value 
here, as the combinedWindow.value.timestamp() < inputRecordTimestamp

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (next.value.timestamp() < timestamp) {
+                            previousRecordTimestamp = next.value.timestamp();
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window will go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {

Review comment:
       Updated the comment in an attempt to do a mini-proof:
   `If there wasn't a right window agg found and we need a right window for our 
new record, the current aggregate in the combined window will go in the new 
record's right window. We can be sure that the combined window only holds 
records that fall into the current record's right window for two reasons:
   1. If there were records earlier than the current record AND later than the 
current record, there would be a right window found when we looked for right 
window agg. 
   2. If there was only a record before the current record, we wouldn't need a 
right window for the current record and wouldn't update the rightWinAgg value 
here, as the combinedWindow.value.timestamp() < inputRecordTimestamp`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to