guozhangwang commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771726705



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -253,12 +262,17 @@ public boolean setFlushListener(final 
CacheFlushListener<K, V> listener,
             rawRangeQuery = RangeQuery.withNoBounds();
         }
         final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
-                wrapped().query(rawRangeQuery, positionBound, 
collectExecutionInfo);
+            wrapped().query(rawRangeQuery, positionBound, 
collectExecutionInfo);
         if (rawResult.isSuccess()) {
             final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
             final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
-                    iterator, getSensor, getValueDeserializer());
-            final QueryResult<KeyValueIterator<K, V>> typedQueryResult = 
QueryResult.forResult(resultIterator);
+                iterator,
+                getSensor,
+                getValueDeserializer()
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult = 
rawResult.swapResult(
+                resultIterator

Review comment:
       nit: why newline with just one parameter?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+/**
+ * Interactive query for retrieving a single record based on its key.
+ */
+@Evolving
+public final class KeyQuery<K, V> implements Query<V> {
+
+    private final K key;
+
+    private KeyQuery(final K key) {
+        this.key = key;
+    }
+
+    /**
+     * Creates a query that will retrieve the record identified by {@code key} 
if it exists
+     * (or {@code null} otherwise).
+     * @param key The key to retrieve
+     * @param <K> The type of the key
+     * @param <V> The type of the value that will be retrieved
+     */
+    public static <K, V> KeyQuery<K, V> withKey(final K key) {
+        return new KeyQuery<>(key);

Review comment:
       Should we check the `key` is not null here? Since in later callers e.g. 
`final KeyQuery<Bytes, byte[]> rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));` we do not check if 
`getKey()` is null or not, and `keyBytes` function could throw if it is.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -162,15 +163,39 @@ public static boolean isPermitted(
             }
             final R result = (R) iterator;
             return QueryResult.forResult(result);
-        } catch (final Throwable t) {
-            final String message = parseStoreException(t, store, query);
+        } catch (final Exception e) {
+            final String message = parseStoreException(e, store, query);
             return QueryResult.forFailure(
                 FailureReason.STORE_EXCEPTION,
                 message
             );
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                                  final PositionBound 
positionBound,
+                                                  final boolean 
collectExecutionInfo,
+                                                  final StateStore store) {
+        if (store instanceof KeyValueStore) {
+            final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, 
byte[]>) query;
+            final KeyValueStore<Bytes, byte[]> keyValueStore =
+                (KeyValueStore<Bytes, byte[]>) store;
+            try {
+                final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+                return (QueryResult<R>) QueryResult.forResult(bytes);

Review comment:
       Should we use `swap` here as well?
   
   Also, I'm feeling maybe we can introduce an internal class extending on 
`KeyQuery<?, byte[]>` and only define the `swap` in that class (see my other 
comment above).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -162,15 +163,39 @@ public static boolean isPermitted(
             }
             final R result = (R) iterator;
             return QueryResult.forResult(result);
-        } catch (final Throwable t) {
-            final String message = parseStoreException(t, store, query);
+        } catch (final Exception e) {
+            final String message = parseStoreException(e, store, query);
             return QueryResult.forFailure(
                 FailureReason.STORE_EXCEPTION,
                 message
             );
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                                  final PositionBound 
positionBound,
+                                                  final boolean 
collectExecutionInfo,
+                                                  final StateStore store) {
+        if (store instanceof KeyValueStore) {
+            final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, 
byte[]>) query;
+            final KeyValueStore<Bytes, byte[]> keyValueStore =
+                (KeyValueStore<Bytes, byte[]>) store;
+            try {
+                final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+                return (QueryResult<R>) QueryResult.forResult(bytes);
+            } catch (final Exception e) {
+                final String message = parseStoreException(e, store, query);

Review comment:
       Should `parseStoreException`'s first parameter be `Exception` then?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -677,18 +638,56 @@ public void shouldHandlePingQuery() {
         }
     }
 
+    public <V> void shouldHandleKeyQuery(
+        final Integer key,
+        final Function<V, Integer> valueExtactor,
+        final Integer expectedValue) {
+
+        final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
+        final StateQueryRequest<V> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<V> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        final QueryResult<V> queryResult =
+            result.getGlobalResult() != null

Review comment:
       I think I'm still a bit confused here about global result, thinking we 
should have not supported this, and hence here it should always be `null`, is 
that right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##########
@@ -197,6 +197,18 @@ public R getResult() {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);

Review comment:
       @vvcephei WDYT about having an extended `QueryResultInBytes` (just a 
placeholder for name) on `QueryResult<byte[]>` and move this function to that 
extended class? This way we can avoid mistakenly using the swap functions.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##########
@@ -34,7 +34,7 @@
  * <p>
  */
 @Evolving
-public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
+public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {

Review comment:
       Just realized: we call the range query of kv-store `RangeQuery`, and the 
range query of window store `WindowRangeQuery`, and similarly for key queries. 
Is that intentional?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -54,22 +55,22 @@
         );
     }
 
-    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+    @SuppressWarnings("rawtypes")
+    private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP =
         mkMap(
-            mkEntry(
-                PingQuery.class,
-                (query, positionBound, collectExecutionInfo, store) -> 
QueryResult.forResult(true)
-            ),
             mkEntry(
                 RangeQuery.class,
                 StoreQueryUtils::runRangeQuery
+            ),
+            mkEntry(KeyQuery.class,
+                    StoreQueryUtils::runKeyQuery

Review comment:
       Seems misaligned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to