ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r475985073



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -328,6 +328,68 @@ public void testAggregateLargeInput() {
         );
     }
 
+    @Test
+    public void testEarlyRecords() {

Review comment:
       Can we add maybe one or two more tests? I think at the least we should 
have one test that processes _only_ early records, and one test that covers 
input(s) with the same timestamp.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             boolean rightWinAlreadyCreated = false;
 
             // keep the left type window closest to the record
-            Window latestLeftTypeWindow = null;
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> latestLeftTypeWindow 
= null;
             try (
                     final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(

Review comment:
       We need to make sure the `fetch` bounds don't go into the negative. We 
only call `processEarly` if the record's timestamp is within the 
timeDifferenceMs, but here we search starting at timestamp - 2*timeDifferenceMs
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
         }
 
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+
+            try (
+                    final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                            key,
+                            key,
+                            timestamp - 2 * windows.timeDifferenceMs(),
+                            // to catch the current record's right window, if 
it exists, without more calls to the store
+                            timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (endTime == windows.timeDifferenceMs()) {
+                        combinedWindow = next;
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else {

Review comment:
       It took me a second to get this -- can we explicitly check `if startTime 
== timestamp + 1` instead of falling back to `else` and implicitly relying on 
the fetch bounds? You can just get rid of the `else` altogether or throw an 
IllegalStateException if none of the specific conditions are met and the else 
is reached, whatever makes sense to you

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
         }
 
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+
+            try (
+                    final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                            key,
+                            key,
+                            timestamp - 2 * windows.timeDifferenceMs(),
+                            // to catch the current record's right window, if 
it exists, without more calls to the store
+                            timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (endTime == windows.timeDifferenceMs()) {
+                        combinedWindow = next;
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //create the right window for the most recent max timestamp in 
the combined window

Review comment:
       > most recent max timestamp
   
   Huh? I think I know what you're trying to say here but it seems like two 
different phrases got a bit mixed up here

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
         }
 
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+
+            try (
+                    final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                            key,
+                            key,
+                            timestamp - 2 * windows.timeDifferenceMs(),
+                            // to catch the current record's right window, if 
it exists, without more calls to the store
+                            timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (endTime == windows.timeDifferenceMs()) {

Review comment:
       nit: can we make this `if startTime == 0` ? That seems slightly easier 
to understand, and then all the conditionals can be in terms of startTime which 
is a bit more intuitive since that's what we're iterating over. Context 
switching between startTime and endTime kind of makes me lose my train of 
thought

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -183,7 +188,8 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
             //create right window for previous record
             if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
+                final long leftWindowEnd = 
latestLeftTypeWindow.key.window().end();

Review comment:
       This is really just the timestamp of the previous record, right? Can we 
call it something that reflects that

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -211,6 +217,67 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
         }
 
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+
+            try (
+                    final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                            key,
+                            key,
+                            timestamp - 2 * windows.timeDifferenceMs(),

Review comment:
       I think this fetch might break if you go into the negatives, should just 
fetch starting from 0

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
         }
 
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;

Review comment:
       This is just a window from [0, timeDifferenceMs] that stores the 
aggregation of all the "early" records, right? I can't really think of a more 
descriptive name so can we just leave a comment explaining what it's for

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
         }
 
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+
+            try (
+                    final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                            key,
+                            key,
+                            timestamp - 2 * windows.timeDifferenceMs(),
+                            // to catch the current record's right window, if 
it exists, without more calls to the store
+                            timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (endTime == windows.timeDifferenceMs()) {
+                        combinedWindow = next;
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //create the right window for the most recent max timestamp in 
the combined window
+                final long rightWinStart = combinedWindow.value.timestamp() + 
1;
+                if (!windowStartTimes.contains(rightWinStart) && 
combinedWindow.value.timestamp() < timestamp) {
+                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }
+                //update the combined window with the new aggregated

Review comment:
       nit: `aggregated` -> `aggregate`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
         }
 
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+
+            try (
+                    final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                            key,
+                            key,
+                            timestamp - 2 * windows.timeDifferenceMs(),
+                            // to catch the current record's right window, if 
it exists, without more calls to the store
+                            timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (endTime == windows.timeDifferenceMs()) {
+                        combinedWindow = next;
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //create the right window for the most recent max timestamp in 
the combined window
+                final long rightWinStart = combinedWindow.value.timestamp() + 
1;

Review comment:
       This name keeps throwing me off...right window of what? Is it like 
`previousRecordRightWindow`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             }
         }
 
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+
+            try (
+                    final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                            key,
+                            key,
+                            timestamp - 2 * windows.timeDifferenceMs(),
+                            // to catch the current record's right window, if 
it exists, without more calls to the store
+                            timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (endTime == windows.timeDifferenceMs()) {
+                        combinedWindow = next;
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //create the right window for the most recent max timestamp in 
the combined window
+                final long rightWinStart = combinedWindow.value.timestamp() + 
1;
+                if (!windowStartTimes.contains(rightWinStart) && 
combinedWindow.value.timestamp() < timestamp) {

Review comment:
       It's not immediately obvious why this is correct/captures all possible 
cases so we should leave a comment, or better yet factor out this condition 
into a descriptively named method (or best yet, do both)
   I was concerned about out-of-order records, since in that case the previous 
record would obviously not be the one with the maximum timestamp in the 
combined window. But I realized that we actually never need to create a 
previous record's right window for out-of-order early records, since there's no 
way for a full timeDifferenceMs to fit between the previous record and whatever 
the max record in the combined window is. So, all we need to do is make sure we 
only try to create the previous record's right window if the current window is 
the maximum record within the combined window, ie the 
`combinedWindow.value.timestamp() < timestamp`
   But that's a very long and pretty ineloquent comment to leave in the code. 
Hopefully you can come up with a more concise explanation 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to