guozhangwang commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771731855
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -267,17 +281,41 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener, return result; } + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runKeyQuery(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + final QueryResult<R> result; + final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); + if (rawResult.isSuccess()) { + final Deserializer<V> deserializer = getValueDeserializer(); + final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult()); + final QueryResult<V> typedQueryResult = + rawResult.swapResult(value); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) private Deserializer<V> getValueDeserializer() { - final Serde<V> vSerde = serdes.valueSerde(); + final Serde<V> valueSerde = serdes.valueSerde(); final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); final Deserializer<V> deserializer; - if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { + if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { Review comment: This is the part that I'm not completely sure about either... maybe some quick sync on this would be more effective? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org