vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771746009
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -267,17 +281,41 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener, return result; } + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runKeyQuery(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + final QueryResult<R> result; + final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); + if (rawResult.isSuccess()) { + final Deserializer<V> deserializer = getValueDeserializer(); + final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult()); + final QueryResult<V> typedQueryResult = + rawResult.swapResult(value); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) private Deserializer<V> getValueDeserializer() { - final Serde<V> vSerde = serdes.valueSerde(); + final Serde<V> valueSerde = serdes.valueSerde(); final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); final Deserializer<V> deserializer; - if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { + if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { Review comment: I know it's weird, but it is correct. I would like to revisit it, but I think we really need to do that after the current round of queries are implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org