mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1396762731


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,216 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,

Review Comment:
   nit: fix indention



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -127,9 +136,25 @@ public <R> QueryResult<R> query(
         final QueryConfig config) {
 
 
+
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-        final QueryResult<R> result = store.query(query, positionBound, 
config);
+        QueryResult<R> result = store.query(query, positionBound, config);
+
+        if (result.isSuccess()) {
+            if (query instanceof KeyQuery || query instanceof 
TimestampedKeyQuery) {
+                final byte[] plainValue = (byte[]) result.getResult();
+                final byte[] valueWithTimestamp = 
convertToTimestampedFormat(plainValue);
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(result, 
valueWithTimestamp);
+            } else if (query instanceof RangeQuery || query instanceof 
TimestampedRangeQuery) {
+                final KeyValueToTimestampedKeyValueAdapterIterator 
wrappedRocksDBRangeIterator = new 
KeyValueToTimestampedKeyValueAdapterIterator((RocksDbIterator) 
result.getResult());
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(result, 
wrappedRocksDBRangeIterator);
+            } else {
+                throw new StreamsException("unsupported query type");

Review Comment:
   Let change this to:
   ```
   throw new IllegalArgumentException("Unsupported query type: " + 
query.getClass());
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,216 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
valueAndTimestamp);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.lowerBound().orElse(null)),
+                keyBytes(typedQuery.upperBound().orElse(null))
+        );
+        if (!isKeyAscending) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = 
(KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> 
typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
+            final V plainValue = valueAndTimestamp == null ? null : 
valueAndTimestamp.value();
+            final QueryResult<V> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
plainValue);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private  <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                               final PositionBound 
positionBound,
+                                               final QueryConfig config) {
+
+        final QueryResult<R> result;
+        final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.getLowerBound().orElse(null)),
+                keyBytes(typedQuery.getUpperBound().orElse(null))
+        );
+        if (!isKeyAscending) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreIterator(
+                iterator,
+                getSensor,
+                getDeserializeValue(serdes, wrapped()),
+                true
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private class MeteredTimestampedKeyValueStoreIterator implements 
KeyValueIterator<K, V> {
+
+        private final KeyValueIterator<Bytes, byte[]> iter;
+        private final Sensor sensor;
+        private final long startNs;
+        private final Function<byte[], ValueAndTimestamp<V>> valueDeserializer;
+
+        private final boolean isKeyOrRangeQuery;
+
+        private MeteredTimestampedKeyValueStoreIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
+                                                   final Sensor sensor,
+                                                   final Function<byte[], 
ValueAndTimestamp<V>> valueDeserializer,
+                                                   final boolean 
isKeyOrRangeQuery) {

Review Comment:
   `isKeyOrRangeQuery` -> `returnPlainValue`
   
   For KeyQuery we don't create an iterator at all, and `returnPlainValue` is 
more descriptive



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,216 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
valueAndTimestamp);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.lowerBound().orElse(null)),
+                keyBytes(typedQuery.upperBound().orElse(null))
+        );
+        if (!isKeyAscending) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = 
(KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> 
typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
+            final V plainValue = valueAndTimestamp == null ? null : 
valueAndTimestamp.value();
+            final QueryResult<V> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
plainValue);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private  <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                               final PositionBound 
positionBound,
+                                               final QueryConfig config) {
+
+        final QueryResult<R> result;
+        final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.getLowerBound().orElse(null)),
+                keyBytes(typedQuery.getUpperBound().orElse(null))
+        );
+        if (!isKeyAscending) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreIterator(
+                iterator,
+                getSensor,
+                getDeserializeValue(serdes, wrapped()),
+                true
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private class MeteredTimestampedKeyValueStoreIterator implements 
KeyValueIterator<K, V> {
+
+        private final KeyValueIterator<Bytes, byte[]> iter;
+        private final Sensor sensor;
+        private final long startNs;
+        private final Function<byte[], ValueAndTimestamp<V>> valueDeserializer;
+
+        private final boolean isKeyOrRangeQuery;
+
+        private MeteredTimestampedKeyValueStoreIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
+                                                   final Sensor sensor,
+                                                   final Function<byte[], 
ValueAndTimestamp<V>> valueDeserializer,
+                                                   final boolean 
isKeyOrRangeQuery) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.valueDeserializer = valueDeserializer;
+            this.startNs = time.nanoseconds();
+            this.isKeyOrRangeQuery = isKeyOrRangeQuery;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            final KeyValue<Bytes, byte[]> keyValue = iter.next();
+            if (isKeyOrRangeQuery) {
+                final V value = 
valueDeserializer.apply(keyValue.value).value();

Review Comment:
   `value` -> `plainValue`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -247,26 +247,18 @@ public Position getPosition() {
     }
 
     @SuppressWarnings("unchecked")
-    protected <R> QueryResult<R> runRangeQuery(final Query<R> query,
+    private <R> QueryResult<R> runRangeQuery(final Query<R> query,
                                                final PositionBound 
positionBound,

Review Comment:
   nit: adjust indention



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,216 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
valueAndTimestamp);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.lowerBound().orElse(null)),
+                keyBytes(typedQuery.upperBound().orElse(null))
+        );
+        if (!isKeyAscending) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = 
(KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> 
typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
+            final V plainValue = valueAndTimestamp == null ? null : 
valueAndTimestamp.value();
+            final QueryResult<V> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
plainValue);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private  <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                               final PositionBound 
positionBound,
+                                               final QueryConfig config) {
+
+        final QueryResult<R> result;
+        final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.getLowerBound().orElse(null)),
+                keyBytes(typedQuery.getUpperBound().orElse(null))
+        );
+        if (!isKeyAscending) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreIterator(
+                iterator,
+                getSensor,
+                getDeserializeValue(serdes, wrapped()),
+                true
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private class MeteredTimestampedKeyValueStoreIterator implements 
KeyValueIterator<K, V> {
+
+        private final KeyValueIterator<Bytes, byte[]> iter;
+        private final Sensor sensor;
+        private final long startNs;
+        private final Function<byte[], ValueAndTimestamp<V>> valueDeserializer;
+
+        private final boolean isKeyOrRangeQuery;
+
+        private MeteredTimestampedKeyValueStoreIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
+                                                   final Sensor sensor,
+                                                   final Function<byte[], 
ValueAndTimestamp<V>> valueDeserializer,

Review Comment:
   `valueDeserializer` -> `valueAndTimestampDeserializer`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -247,26 +247,18 @@ public Position getPosition() {
     }
 
     @SuppressWarnings("unchecked")
-    protected <R> QueryResult<R> runRangeQuery(final Query<R> query,
+    private <R> QueryResult<R> runRangeQuery(final Query<R> query,
                                                final PositionBound 
positionBound,
                                                final QueryConfig config) {
 
         final QueryResult<R> result;
         final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
         RangeQuery<Bytes, byte[]> rawRangeQuery;
         final boolean isKeyAscending = typedQuery.isKeyAscending();
-        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
-            rawRangeQuery = RangeQuery.withRange(
-                keyBytes(typedQuery.getLowerBound().get()),
-                keyBytes(typedQuery.getUpperBound().get())
-            );
-        } else if (typedQuery.getLowerBound().isPresent()) {
-            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
-        } else if (typedQuery.getUpperBound().isPresent()) {
-            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
-        } else {
-            rawRangeQuery = RangeQuery.withNoBounds();
-        }
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.getLowerBound().orElse(null)),
+                keyBytes(typedQuery.getUpperBound().orElse(null))
+        );

Review Comment:
   Thanks for this cleanup. Very nice!



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -195,17 +195,15 @@ public <R> QueryResult<R> query(final Query<R> query, 
final PositionBound positi
             return result;
         }
 
-        @Override
-        protected <R> QueryResult<R> runRangeQuery(final Query<R> query,
-                                                   final PositionBound 
positionBound,
-                                                   final QueryConfig config) {
+        private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                                 final PositionBound 
positionBound,
+                                                 final QueryConfig config) {
             // throw exception for now to reserve the ability to implement 
this in the future
             // without clashing with users' custom implementations in the 
meantime
             throw new UnsupportedOperationException("Versioned stores do not 
support RangeQuery queries at this time.");
         }
 
-        @Override
-        protected <R> QueryResult<R> runKeyQuery(final Query<R> query,
+        private <R> QueryResult<R> runKeyQuery(final Query<R> query,
                                                  final PositionBound 
positionBound,

Review Comment:
   nit: adjust indention



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -293,7 +285,7 @@ protected <R> QueryResult<R> runRangeQuery(final Query<R> 
query,
     }
 
     @SuppressWarnings("unchecked")
-    protected <R> QueryResult<R> runKeyQuery(final Query<R> query,
+    private  <R> QueryResult<R> runKeyQuery(final Query<R> query,
                                              final PositionBound positionBound,

Review Comment:
   adjust indention



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,216 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());

Review Comment:
   With the fix to add the `TimestampedByteStore` marker interface to the 
adaptor class, I think we can simplify `getDeserializeValue(...)` -- as a 
matter of fact, it should become so simple that we might not need this helper 
any longer at all, and we can pull in the code into the callers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to