ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r476781126



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             boolean rightWinAlreadyCreated = false;
 
             // keep the left type window closest to the record
-            Window latestLeftTypeWindow = null;
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> latestLeftTypeWindow 
= null;
             try (
                     final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(

Review comment:
       Are you sure it's actually returning something? Have you tested it with 
a rocksdb store or just with the in-memory store? I think the in-memory store 
would handle this fine since it never serializes the key/timestamps, but if you 
have a rocksdb store (or a caching layer) then the range query works by looking 
up any data between the serialized bounds. Unfortunately a negative long is 
lexicographically greater than a positive long when serialized to bytes. The 
"negative" is encoded as a leading 1 -- which means the lower bound ends up 
being "larger" than the upper bound. I assume that would result in no data 
being returned, but I'm not actually 100% sure what would happen




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to