mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485129599



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -117,25 +117,44 @@ public void process(final K key, final V value) {
                 return;
             }
 
-            final long timestamp = context().timestamp();
-            //don't process records that don't fall within a full sliding 
window
-            if (timestamp < windows.timeDifferenceMs()) {
+            final long inputRecordTimestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, 
inputRecordTimestamp);
+            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+
+            if (inputRecordTimestamp + 1 + windows.timeDifferenceMs() <= 
closeTime) {

Review comment:
       nit: `1` -> `1L`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             boolean leftWinAlreadyCreated = false;
             boolean rightWinAlreadyCreated = false;
 
-            // keep the left type window closest to the record
-            Window latestLeftTypeWindow = null;
+            Long previousRecordTimestamp = null;
+
             try (
                 final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
                     key,
                     key,
-                    timestamp - 2 * windows.timeDifferenceMs(),
+                    Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
                     // to catch the current record's right window, if it 
exists, without more calls to the store
-                    timestamp + 1)
+                    inputRecordTimestamp + 1)
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
-                    windowStartTimes.add(next.key.window().start());
-                    final long startTime = next.key.window().start();
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> 
windowBeingProcessed = iterator.next();
+                    final long startTime = 
windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
                     final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
 
-                    if (endTime < timestamp) {
-                        leftWinAgg = next.value;
-                        if (isLeftWindow(next)) {
-                            latestLeftTypeWindow = next.key.window();
-                        }
-                    } else if (endTime == timestamp) {
+                    if (endTime < inputRecordTimestamp) {
+                        leftWinAgg = windowBeingProcessed.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                    } else if (endTime == inputRecordTimestamp) {
                         leftWinAlreadyCreated = true;
-                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else if (endTime > timestamp && startTime <= timestamp) {
-                        rightWinAgg = next.value;
-                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else {
+                        if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                    } else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
+                        rightWinAgg = windowBeingProcessed.value;
+                        
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                    } else if (startTime == inputRecordTimestamp + 1) {
                         rightWinAlreadyCreated = true;
+                    } else {
+                        throw new IllegalStateException("Unexpected window 
found when processing sliding windows");
                     }
                 }
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, inputRecordTimestamp)) {
+                    final TimeWindow window = new 
TimeWindow(previousRightWinStart, previousRightWinStart + 
windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+                    updateWindowAndForward(window, valueAndTime, key, value, 
closeTime, inputRecordTimestamp);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (leftWindowNotEmpty(previousRecordTimestamp, 
inputRecordTimestamp)) {
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
inputRecordTimestamp);
                 } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
inputRecordTimestamp);
                 }
-                final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
-                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                final TimeWindow window = new TimeWindow(inputRecordTimestamp 
- windows.timeDifferenceMs(), inputRecordTimestamp);
+                updateWindowAndForward(window, valueAndTime, key, value, 
closeTime, inputRecordTimestamp);
             }
-            //create right window for new record
-            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
-                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
inputRecordTimestamp)) {
+                createCurrentRecordRightWindow(inputRecordTimestamp, 
rightWinAgg, key);
             }
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        /**
+         * Created to handle records where 0 < inputRecordTimestamp < 
timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, 
we will put them into the [0, timeDifferenceMs]
+         * window as a "workaround", and we will update or create their right 
windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long 
inputRecordTimestamp, final long closeTime) {
+            if (inputRecordTimestamp < 0 || inputRecordTimestamp >= 
windows.timeDifferenceMs()) {
+                throw new IllegalArgumentException("Early record for sliding 
windows must fall between 0 < inputRecordTimestamp < timeDifferenceMs");

Review comment:
       nit: `fall between 0 < inputRecordTimestamp` -> `fall between 0 <= 
inputRecordTimestamp`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             boolean leftWinAlreadyCreated = false;
             boolean rightWinAlreadyCreated = false;
 
-            // keep the left type window closest to the record
-            Window latestLeftTypeWindow = null;
+            Long previousRecordTimestamp = null;
+
             try (
                 final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
                     key,
                     key,
-                    timestamp - 2 * windows.timeDifferenceMs(),
+                    Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
                     // to catch the current record's right window, if it 
exists, without more calls to the store
-                    timestamp + 1)
+                    inputRecordTimestamp + 1)
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = 
iterator.next();
-                    windowStartTimes.add(next.key.window().start());
-                    final long startTime = next.key.window().start();
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> 
windowBeingProcessed = iterator.next();
+                    final long startTime = 
windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
                     final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
 
-                    if (endTime < timestamp) {
-                        leftWinAgg = next.value;
-                        if (isLeftWindow(next)) {
-                            latestLeftTypeWindow = next.key.window();
-                        }
-                    } else if (endTime == timestamp) {
+                    if (endTime < inputRecordTimestamp) {
+                        leftWinAgg = windowBeingProcessed.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                    } else if (endTime == inputRecordTimestamp) {
                         leftWinAlreadyCreated = true;
-                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else if (endTime > timestamp && startTime <= timestamp) {
-                        rightWinAgg = next.value;
-                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else {
+                        if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                    } else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
+                        rightWinAgg = windowBeingProcessed.value;
+                        
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                    } else if (startTime == inputRecordTimestamp + 1) {
                         rightWinAlreadyCreated = true;
+                    } else {
+                        throw new IllegalStateException("Unexpected window 
found when processing sliding windows");
                     }
                 }
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, inputRecordTimestamp)) {
+                    final TimeWindow window = new 
TimeWindow(previousRightWinStart, previousRightWinStart + 
windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+                    updateWindowAndForward(window, valueAndTime, key, value, 
closeTime, inputRecordTimestamp);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
+                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+                if (leftWindowNotEmpty(previousRecordTimestamp, 
inputRecordTimestamp)) {
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
inputRecordTimestamp);
                 } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
inputRecordTimestamp);
                 }
-                final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
-                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                final TimeWindow window = new TimeWindow(inputRecordTimestamp 
- windows.timeDifferenceMs(), inputRecordTimestamp);
+                updateWindowAndForward(window, valueAndTime, key, value, 
closeTime, inputRecordTimestamp);
             }
-            //create right window for new record
-            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
-                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
inputRecordTimestamp)) {
+                createCurrentRecordRightWindow(inputRecordTimestamp, 
rightWinAgg, key);
             }
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> 
rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        /**
+         * Created to handle records where 0 < inputRecordTimestamp < 
timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, 
we will put them into the [0, timeDifferenceMs]
+         * window as a "workaround", and we will update or create their right 
windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long 
inputRecordTimestamp, final long closeTime) {
+            if (inputRecordTimestamp < 0 || inputRecordTimestamp >= 
windows.timeDifferenceMs()) {
+                throw new IllegalArgumentException("Early record for sliding 
windows must fall between 0 < inputRecordTimestamp < timeDifferenceMs");
+            }
+
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    0,
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    inputRecordTimestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> 
windowBeingProcessed = iterator.next();
+                    final long startTime = 
windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
+                    final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = windowBeingProcessed;
+                        //We don't need to store previousRecordTimestamp if 
maxRecordTimestamp > timestamp

Review comment:
       nit: `maxRecordTimestamp > timestamp` -> `maxRecordTimestamp >= 
timestamp`
   
   nit: missing space: `// We`

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -439,6 +442,185 @@ public void testJoin() {
         }
     }
 
+    @SuppressWarnings("unchecked")

Review comment:
       Why do we need to suppress?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to