patrickstuedi commented on a change in pull request #11234:
URL: https://github.com/apache/kafka/pull/11234#discussion_r720243284



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -212,18 +216,15 @@ public void remove(final Windowed<Bytes> sessionKey) {
                                                                   final Bytes 
keyTo,
                                                                   final long 
earliestSessionEndTime,
                                                                   final long 
latestSessionStartTime) {
-        if (keyFrom.compareTo(keyTo) > 0) {
-            LOG.warn("Returning empty iterator for fetch with invalid key 
range: from > to. " +
-                "This may be due to range arguments set in the wrong order, " +
-                "or serdes that don't preserve ordering when lexicographically 
comparing the serialized bytes. " +
-                "Note that the built-in numerical serdes do not follow this 
for negative numbers");
+        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
+            LOG.warn(INVALID_RANGE_WARN_MSG);
             return KeyValueIterators.emptyIterator();
         }
 
         validateStoreOpen();
 
-        final Bytes cacheKeyFrom = 
cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime));
-        final Bytes cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime));
+        final Bytes cacheKeyFrom = keyFrom == null ? null : 
cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime));

Review comment:
       Just curious, would it make sense to make cacheFunction deal with null 
keys (and return null in that case), what do you think?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -351,10 +345,8 @@ public V fetchSession(final K key, final long 
earliestSessionEndTime, final long
                                                                  final K keyTo,
                                                                  final long 
earliestSessionEndTime,
                                                                  final long 
latestSessionStartTime) {
-        Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
-        Objects.requireNonNull(keyTo, "keyTo cannot be null");
-        final Bytes bytesKeyFrom = keyBytes(keyFrom);
-        final Bytes bytesKeyTo = keyBytes(keyTo);
+        final Bytes bytesKeyFrom = keyFrom == null ? null : keyBytes(keyFrom);

Review comment:
       Similarly here, would it make sense to integrate that check into 
keyBytes? I think there are similar cases in other stores.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to