guozhangwang commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771726705
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -253,12 +262,17 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener, rawRangeQuery = RangeQuery.withNoBounds(); } final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = - wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo); + wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo); if (rawResult.isSuccess()) { final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator( - iterator, getSensor, getValueDeserializer()); - final QueryResult<KeyValueIterator<K, V>> typedQueryResult = QueryResult.forResult(resultIterator); + iterator, + getSensor, + getValueDeserializer() + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = rawResult.swapResult( + resultIterator Review comment: nit: why newline with just one parameter? ########## File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +/** + * Interactive query for retrieving a single record based on its key. + */ +@Evolving +public final class KeyQuery<K, V> implements Query<V> { + + private final K key; + + private KeyQuery(final K key) { + this.key = key; + } + + /** + * Creates a query that will retrieve the record identified by {@code key} if it exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + */ + public static <K, V> KeyQuery<K, V> withKey(final K key) { + return new KeyQuery<>(key); Review comment: Should we check the `key` is not null here? Since in later callers e.g. `final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));` we do not check if `getKey()` is null or not, and `keyBytes` function could throw if it is. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ########## @@ -162,15 +163,39 @@ public static boolean isPermitted( } final R result = (R) iterator; return QueryResult.forResult(result); - } catch (final Throwable t) { - final String message = parseStoreException(t, store, query); + } catch (final Exception e) { + final String message = parseStoreException(e, store, query); return QueryResult.forFailure( FailureReason.STORE_EXCEPTION, message ); } } + @SuppressWarnings("unchecked") + private static <R> QueryResult<R> runKeyQuery(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store) { + if (store instanceof KeyValueStore) { + final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, byte[]>) query; + final KeyValueStore<Bytes, byte[]> keyValueStore = + (KeyValueStore<Bytes, byte[]>) store; + try { + final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); + return (QueryResult<R>) QueryResult.forResult(bytes); Review comment: Should we use `swap` here as well? Also, I'm feeling maybe we can introduce an internal class extending on `KeyQuery<?, byte[]>` and only define the `swap` in that class (see my other comment above). ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ########## @@ -162,15 +163,39 @@ public static boolean isPermitted( } final R result = (R) iterator; return QueryResult.forResult(result); - } catch (final Throwable t) { - final String message = parseStoreException(t, store, query); + } catch (final Exception e) { + final String message = parseStoreException(e, store, query); return QueryResult.forFailure( FailureReason.STORE_EXCEPTION, message ); } } + @SuppressWarnings("unchecked") + private static <R> QueryResult<R> runKeyQuery(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store) { + if (store instanceof KeyValueStore) { + final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, byte[]>) query; + final KeyValueStore<Bytes, byte[]> keyValueStore = + (KeyValueStore<Bytes, byte[]>) store; + try { + final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); + return (QueryResult<R>) QueryResult.forResult(bytes); + } catch (final Exception e) { + final String message = parseStoreException(e, store, query); Review comment: Should `parseStoreException`'s first parameter be `Exception` then? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ########## @@ -677,18 +638,56 @@ public void shouldHandlePingQuery() { } } + public <V> void shouldHandleKeyQuery( + final Integer key, + final Function<V, Integer> valueExtactor, + final Integer expectedValue) { + + final KeyQuery<Integer, V> query = KeyQuery.withKey(key); + final StateQueryRequest<V> request = + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); + + final StateQueryResult<V> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + + final QueryResult<V> queryResult = + result.getGlobalResult() != null Review comment: I think I'm still a bit confused here about global result, thinking we should have not supported this, and hence here it should always be `null`, is that right? ########## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ########## @@ -197,6 +197,18 @@ public R getResult() { return result; } + @SuppressWarnings("unchecked") + public <V> QueryResult<V> swapResult(final V value) { + if (isFailure()) { + return (QueryResult<V>) this; + } else { + final QueryResult<V> result = new QueryResult<>(value); Review comment: @vvcephei WDYT about having an extended `QueryResultInBytes` (just a placeholder for name) on `QueryResult<byte[]>` and move this function to that extended class? This way we can avoid mistakenly using the swap functions. ########## File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java ########## @@ -34,7 +34,7 @@ * <p> */ @Evolving -public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { +public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { Review comment: Just realized: we call the range query of kv-store `RangeQuery`, and the range query of window store `WindowRangeQuery`, and similarly for key queries. Is that intentional? ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ########## @@ -54,22 +55,22 @@ ); } - private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP = + @SuppressWarnings("rawtypes") + private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP = mkMap( - mkEntry( - PingQuery.class, - (query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true) - ), mkEntry( RangeQuery.class, StoreQueryUtils::runRangeQuery + ), + mkEntry(KeyQuery.class, + StoreQueryUtils::runKeyQuery Review comment: Seems misaligned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org