spena commented on a change in pull request #11235:
URL: https://github.com/apache/kafka/pull/11235#discussion_r692180473



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -214,6 +223,128 @@ public void testLeftJoinDuplicates() {
         }
     }
 
+    @Test
+    public void shouldSendTombstoneForLeftJoinCandidatesRocksDb() {

Review comment:
       Could you add a similar test to the KStreamKStreamOuterJoinTest?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) {
     }
 
     /**
-     * {@inheritdoc}
+     * {@inheritDoc}
      *
      * This method is optimized for {@link 
RocksDBTimeOrderedWindowStore#all()} only. Key and time
      * range queries are not supported.
      */
     @Override
     public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {
-        if (binaryKeyFrom != null || binaryKeyTo != null) {
-            throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys 
cannot be non-null. Key and time range queries are not supported.");
+        if (binaryKeyFrom == null && binaryKeyTo == null && from == 0 && to == 
Long.MAX_VALUE) {
+            return Iterator::hasNext;
         }
 
-        if (from != 0 && to != Long.MAX_VALUE) {
-            throw new IllegalArgumentException("from/to time ranges should be 
0 to Long.MAX_VALUE. Key and time range queries are not supported.");
+        if (binaryKeyFrom != null && binaryKeyFrom.equals(binaryKeyTo) && from 
== to) {
+
+            return iterator -> {
+                while (iterator.hasNext()) {
+                    final Bytes bytes = iterator.peekNextKey();
+                    final Bytes keyBytes = Bytes
+                        
.wrap(TimeOrderedKeySchema.extractStoreKeyBytes(bytes.get()));
+                    final long time = 
TimeOrderedKeySchema.extractStoreTimestamp(bytes.get());
+                    if (keyBytes.compareTo(binaryKeyFrom) >= 0
+                        && keyBytes.compareTo(binaryKeyTo) <= 0
+                        && time >= from
+                        && time <= to) {
+                        return true;
+                    }
+                    iterator.next();
+                }
+                return false;
+            };
         }
 
-        return iterator -> iterator.hasNext();
+        throw new IllegalArgumentException("Key and time range queries are not 
supported.");
     }
 
     @Override
     public <S extends Segment> List<S> segmentsToSearch(final Segments<S> 
segments, final long from, final long to, final boolean forward) {

Review comment:
       Add some tests in TimeOrderedKeySchemaTest.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) {
     }
 
     /**
-     * {@inheritdoc}
+     * {@inheritDoc}
      *
      * This method is optimized for {@link 
RocksDBTimeOrderedWindowStore#all()} only. Key and time
      * range queries are not supported.
      */
     @Override
     public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {

Review comment:
       TimeOrderedKeySchemaTest

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) {
     }
 
     /**
-     * {@inheritdoc}
+     * {@inheritDoc}
      *
      * This method is optimized for {@link 
RocksDBTimeOrderedWindowStore#all()} only. Key and time
      * range queries are not supported.
      */
     @Override
     public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {
-        if (binaryKeyFrom != null || binaryKeyTo != null) {
-            throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys 
cannot be non-null. Key and time range queries are not supported.");
+        if (binaryKeyFrom == null && binaryKeyTo == null && from == 0 && to == 
Long.MAX_VALUE) {
+            return Iterator::hasNext;
         }
 
-        if (from != 0 && to != Long.MAX_VALUE) {
-            throw new IllegalArgumentException("from/to time ranges should be 
0 to Long.MAX_VALUE. Key and time range queries are not supported.");
+        if (binaryKeyFrom != null && binaryKeyFrom.equals(binaryKeyTo) && from 
== to) {
+
+            return iterator -> {
+                while (iterator.hasNext()) {
+                    final Bytes bytes = iterator.peekNextKey();
+                    final Bytes keyBytes = Bytes
+                        
.wrap(TimeOrderedKeySchema.extractStoreKeyBytes(bytes.get()));
+                    final long time = 
TimeOrderedKeySchema.extractStoreTimestamp(bytes.get());
+                    if (keyBytes.compareTo(binaryKeyFrom) >= 0
+                        && keyBytes.compareTo(binaryKeyTo) <= 0
+                        && time >= from
+                        && time <= to) {
+                        return true;
+                    }
+                    iterator.next();
+                }
+                return false;
+            };
         }
 
-        return iterator -> iterator.hasNext();
+        throw new IllegalArgumentException("Key and time range queries are not 
supported.");
     }
 
     @Override
     public <S extends Segment> List<S> segmentsToSearch(final Segments<S> 
segments, final long from, final long to, final boolean forward) {
-        throw new UnsupportedOperationException();
+        if (from != to) {
+            throw new IllegalArgumentException("");

Review comment:
       Add a message of what arguments are illegal?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -43,12 +45,12 @@
 
     @Override
     public Bytes upperRange(final Bytes key, final long to) {
-        throw new UnsupportedOperationException();
+        return null;

Review comment:
       Should we document in the `KeySchema` interface that upperRange and 
lowerRange may return null when it is not supported by the impl?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
##########
@@ -132,8 +135,18 @@ public void init(final StateStoreContext context,
     public void put(final Bytes key,
                     final byte[] value,
                     final long windowStartTimestamp) {
+        final Collection<Bytes> rawKeys;
+        final WindowStore<Bytes, byte[]> windowStore = wrapped();
+
+        if (value == null && windowStore instanceof RawKeyAccessor) {
+            rawKeys = ((RawKeyAccessor) windowStore).keys(key, 
windowStartTimestamp);

Review comment:
       Do you think it would make the code more clear if we rename the `keys()` 
 to `rawKeys()`? Looking at the `RawKeyAccess`, the `keys()` makes sense. But 
looking at the `RocksDBTimeOrderedWindowStore.keys()`, the name does not tell 
much about what keys.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to