vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771746009



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -267,17 +281,41 @@ public boolean setFlushListener(final 
CacheFlushListener<K, V> listener,
         return result;
     }
 
+
     @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                           final PositionBound positionBound,
+                                           final boolean collectExecutionInfo) 
{
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+            KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final Deserializer<V> deserializer = getValueDeserializer();
+            final V value = deserializer.deserialize(serdes.topic(), 
rawResult.getResult());
+            final QueryResult<V> typedQueryResult =
+                rawResult.swapResult(value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
     private Deserializer<V> getValueDeserializer() {
-        final Serde<V> vSerde = serdes.valueSerde();
+        final Serde<V> valueSerde = serdes.valueSerde();
         final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
         final Deserializer<V> deserializer;
-        if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+        if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review comment:
       I know it's weird, but it is correct. I would like to revisit it, but I 
think we really need to do that after the current round of queries are 
implemented.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to