[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771749966



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {

Review comment:
   Sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771749813



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;
+} else {
+final QueryResult result = new QueryResult<>(value);

Review comment:
   I mean, if the upper layers need `QueryResult` with proper type, and 
the inner layers need `QueryResult`, we should just have two properly 
types object, and instead of "swapping", just take the `byte[]` from 
`QueryResult`, deserialize them, and stuff the result into the 
`QueryResult` object.
   
   > but this is the API we agreed on in the KIP.
   
   I did not read the KIP :D (maybe I should have). And we can always adjust 
it. So me it seems useless to have a generic type parameter if we don't obey it 
anyway, and use casts. It's the purpose of generics to avoid casts, and if it 
does not avoid casts, it seems pointless to have).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   > I suppose we could throw an IllegalStateException, since the caller 
probably shouldn't be even trying to "swap" the result on a failed result to 
begin with, but this seems fine, too.
   
   If there is no strict reason not to throw an `IllegalStateException`, I 
would strongly advocate to throw. It not only guards against potential bugs, 
but also expresses the semantics for developers (ie, us) much cleaner and makes 
the code easier to read/reason about.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   > I suppose we could throw an IllegalStateException, since the caller 
probably shouldn't be even trying to "swap" the result on a failed result to 
begin with, but this seems fine, too.
   
   If there is no strict reason not to throw an `IllegalStateException`, I 
would strongly advocate to throw. It not only guards against potential bugs, 
but also expressed the semantics for developers (ie, us) much cleaner and make 
the code easier to read/reason about.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771738427



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -42,16 +102,21 @@ private StoreQueryUtils() {
 final int partition
 ) {
 
-final QueryResult result;
 final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-if (query instanceof PingQuery) {
-if (!isPermitted(position, positionBound, partition)) {
-result = QueryResult.notUpToBound(position, positionBound, 
partition);
-} else {
-result = (QueryResult) QueryResult.forResult(true);
-}
-} else {
+final QueryResult result;
+
+final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());

Review comment:
   🤔 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771738126



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface
+public interface QueryHandler {
+QueryResult apply(
+final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo,
+final StateStore store
+);
+}
+
+
+@SuppressWarnings("unchecked")
+private static final Map, QueryHandler> QUERY_HANDLER_MAP =

Review comment:
   Not sure if I fully understand, but might be less important.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737792



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface

Review comment:
   I see. So we should add it elsewhere, too (of course not as part of the 
IQ work).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737576



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());

Review comment:
   > Also, because we always wrap the non-timestamped store with the 
KeyValueToTimestampedKeyValueByteStoreAdapter, we also always pass through the 
MeteredTimestampedKeyValue store whether the inner store is really timestamped 
or not.
   
   I don't think so. We only do this in the DSL, but not the PAPI.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737277



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());
+final Serde vSerde = serdes.valueSerde();
+final Deserializer deserializer;
+if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {

Review comment:
   > The MeteredStore's serde is always a ValueAndTimestamp serde 
regardless of whether the inner store is Timestamped or not. 
   
   Is it? (1) We also have `MeteredTimestampStore` (of course is extends 
`MeteredStore`) but it seems better to split the logic and move everything 
timestamp related into `MeteredTimestampStore`. (2) For PAPI users, they can 
add a plain `KeyValueStore` and we won't wrap it with the `TimestampedStore` 
face and the serdes won't be `ValueAndTimestamp` either.
   
   > What we do is, when you have a non-timestamped store, we wrap it with an 
extra layer 
(org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter)
 that pads the returned values with a fake timestamp 
(org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat
   
   We only do this in the DSL, if the user gives as a non-timestamped store via 
`Materialized` -- but for PAPI users, we never do this but use whatever store 
is given to use as-is.
   
   > so we did not implement the same padding logic for non-timestamped data 
and instead just bubble up to the MeteredStore
   
   Not sure if I can follow? It should not be a concern for IQ? Also, the 
current conversion between plain/timestamped is really just a corner case (and 
a case that we want to deprecate anyway -- we just did not find a way to do so 
-- maybe we should add a runtime check at some point and WARN users if they 
provide a non-timestamped store until we remove support for it and throw an 
exception instead...). Seems not worth to add more tech debt for this behavior 
that we only added to not break stuff.
   
   > Which means that if we want to deserialize it, we need to know whether to 
use the ValueAndTimestamp deserializer or just the Value's deserializer.
   
   Yes, but we should split this logic between the plain `MeteredStore` and the 
`MeteredTimestampStore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771734218



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {

Review comment:
   We can do this, but should we better say "passed down", not "handled" in 
the message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733605



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;
+} else {
+final QueryResult result = new QueryResult<>(value);

Review comment:
   Why not use two objects?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   > I suppose we could throw an IllegalStateException, since the caller 
probably shouldn't be even trying to "swap" the result on a failed result to 
begin with, but this seems fine, too.
   
   If there is no strict reason not to throw an `IllegalStateException`, I 
would strongly advocate to throw. It's not guards against potential bugs, but 
also expressed the semantics for developers (ie, us) much cleaner and make the 
code easier to read/reason about.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r770076908



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   Why do we not allow to swap the result if the current result has a 
failure? And if we don't want to allow swapping, why just return `this` but not 
throw an exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-15 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r770076908



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   Why do we know allow to swap the result if the current result has a 
failure? And if we don't want to allow swapping, why just return `this` but not 
throw an exception?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {

Review comment:
   Does it make sense to add execution info for this case? We pushed the 
query down, but did not handle it and thus the inner store would track (and we 
should not track)?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");

Review comment:
   nit: `in` -> `within`
   (also: maybe add `.` at the end?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedState