vvcephei commented on a change in pull request #11682:
URL: https://github.com/apache/kafka/pull/11682#discussion_r788231897



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
##########
@@ -145,4 +145,9 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
     void log(final Bytes key, final byte[] value) {
         context.logChange(name(), key, value, context.timestamp(), position);
     }
+
+    @Override
+    public Position getPosition() {
+        return position;

Review comment:
       Similar concern here... It seems like we have to merge this layer's 
position with the wrapped store's position to compute a correct result for this 
method.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##########
@@ -40,7 +42,16 @@ private KeyQuery(final K key) {
      * @param <V> The type of the value that will be retrieved
      */
     public static <K, V> KeyQuery<K, V> withKey(final K key) {
-        return new KeyQuery<>(key);
+        return new KeyQuery<>(key, false);
+    }
+
+    /**
+     * Specifies that the cache should be skipped during query evaluation. 
This means, that the query will always
+     * get forwarded to the underlying store.
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> KeyQuery<K, V> skipCache() {
+        return new KeyQuery<>((K) key, true);

Review comment:
       ```suggestion
       public KeyQuery<K, V> skipCache() {
           return new KeyQuery<>(key, true);
   ```
   
   I think you won't need the cast or the suppression if you just keep the K 
and V bound to the same values as the current instance (`this`). 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -83,10 +90,67 @@ public void init(final StateStoreContext context,
         streamThread = Thread.currentThread();
     }
 
-    Position getPosition() {
+    public Position getPosition() {
         return position;

Review comment:
       Fair enough. Maybe merging the underlying position into the cache's 
position here would be too much, but it does seem necessary to merge them 
together as the result of this method. Otherwise, the cache's position will 
"shadow" the underlying store's, and callers would have no way to learn the 
total position of the layered store.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to