vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771512115
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener, return false; } + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = System.nanoTime(); + final QueryResult<R> result; + + final QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, collectExecutionInfo); + if (collectExecutionInfo) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + collectExecutionInfo, + this + ); + if (collectExecutionInfo) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (System.nanoTime() - start) + "ns"); + } + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runKeyQuery(final Query query, + final PositionBound positionBound, final boolean collectExecutionInfo) { + final QueryResult<R> result; + final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); + if (rawResult.isSuccess()) { + final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); Review comment: I'd forgotten about MeteredTimestampedKeyValueStore, but now that I'm looking at it, what it does is extend the MeteredKeyValueStore, apparently specifically to pad the value serde with a ValueAndTimestamp serde. Otherwise, all the logic lives in MeteredKeyValueStore. Also, because we always wrap the non-timestamped store with the `KeyValueToTimestampedKeyValueByteStoreAdapter`, we also always pass through the MeteredTimestampedKeyValue store whether the inner store is really timestamped or not. I think we could clean this whole hierarchy up a bit, but it's not necessary as part of this work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org