guozhangwang commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771731855



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -267,17 +281,41 @@ public boolean setFlushListener(final 
CacheFlushListener<K, V> listener,
         return result;
     }
 
+
     @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                           final PositionBound positionBound,
+                                           final boolean collectExecutionInfo) 
{
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+            KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final Deserializer<V> deserializer = getValueDeserializer();
+            final V value = deserializer.deserialize(serdes.topic(), 
rawResult.getResult());
+            final QueryResult<V> typedQueryResult =
+                rawResult.swapResult(value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
     private Deserializer<V> getValueDeserializer() {
-        final Serde<V> vSerde = serdes.valueSerde();
+        final Serde<V> valueSerde = serdes.valueSerde();
         final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
         final Deserializer<V> deserializer;
-        if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+        if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review comment:
       This is the part that I'm not completely sure about either... maybe some 
quick sync on this would be more effective?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to