mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1379530923


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -127,14 +131,31 @@ public <R> QueryResult<R> query(
         final QueryConfig config) {
 
 
+
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-        final QueryResult<R> result = store.query(query, positionBound, 
config);
+        QueryResult<R> result = store.query(query, positionBound, config);
+        final Position position = result.getPosition();
+
+        if (result.isSuccess()) {
+            if (result.getResult() instanceof byte[]) {
+                final byte[] plainValue = (byte[]) result.getResult();
+                final byte[] valueWithTimestamp = 
convertToTimestampedFormat(plainValue);
+                result = (QueryResult<R>) 
QueryResult.forResult(valueWithTimestamp);

Review Comment:
   Should we use `InternalQueryResultUtil.copyAndSubstituteDeserializedResult` 
instead, to create a copy of the result correctly?



##########
streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyQuery.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.util.Objects;
+
+/**
+ * Interactive query for retrieving a single record based on its key.
+ */
+@Evolving
+public final class TimestampedKeyQuery<K, V> implements 
Query<ValueAndTimestamp<V>> {
+
+    private final K key;
+    private final boolean skipCache;
+
+    private TimestampedKeyQuery(final K key, final boolean skipCache) {
+        this.key = Objects.requireNonNull(key);

Review Comment:
   I think this check should go into `withKey()` -- we should also add an error 
message
   
   (Might be good to clean this up, inside `KeyQuery`, too)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -180,5 +201,38 @@ public <PS extends Serializer<P>, P> 
KeyValueIterator<Bytes, byte[]> prefixScan(
     public long approximateNumEntries() {
         return store.approximateNumEntries();
     }
+}
 
+class WrappedRocksDbIterator extends AbstractIterator implements 
ManagedKeyValueIterator {

Review Comment:
   Wondering if we should only `implements ManagedKeyValueIterator<Bytes, 
byte[]>` 🤔 -- what do we gain/need from `AbstractIterator`?
   
   We should for sure set the correct types:
   ```
   ... extends AbstractIterator<KeyValue<Bytes, byte[]>> implements 
ManagedKeyValueIterator<Bytes, byte[]>
   ```
   to fix the `Object` return type on some methods.
   
   
   
   



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -180,5 +201,38 @@ public <PS extends Serializer<P>, P> 
KeyValueIterator<Bytes, byte[]> prefixScan(
     public long approximateNumEntries() {
         return store.approximateNumEntries();
     }
+}
 
+class WrappedRocksDbIterator extends AbstractIterator implements 
ManagedKeyValueIterator {

Review Comment:
   Wondering if we should only `implements ManagedKeyValueIterator<Bytes, 
byte[]>` 🤔 -- what do we gain/need from `AbstractIterator`?
   
   We should for sure set the correct types:
   ```
   ... extends AbstractIterator<KeyValue<Bytes, byte[]>> implements 
ManagedKeyValueIterator<Bytes, byte[]>
   ```
   to fix the `Object` return type on some methods.
   
   
   
   



##########
streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyQuery.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.util.Objects;
+
+/**
+ * Interactive query for retrieving a single record based on its key.
+ */
+@Evolving
+public final class TimestampedKeyQuery<K, V> implements 
Query<ValueAndTimestamp<V>> {
+
+    private final K key;
+    private final boolean skipCache;
+
+    private TimestampedKeyQuery(final K key, final boolean skipCache) {
+        this.key = Objects.requireNonNull(key);
+        this.skipCache = skipCache;
+    }
+
+    /**
+     * Creates a query that will retrieve the record identified by {@code key} 
if it exists
+     * (or {@code null} otherwise).
+     * @param key The key to retrieve
+     * @param <K> The type of the key
+     * @param <V> The type of the value that will be retrieved
+     */
+    public static <K, V> TimestampedKeyQuery<K, V> withKey(final K key) {
+        return new TimestampedKeyQuery<>(key, false);
+    }
+
+    /**
+     * Specifies that the cache should be skipped during query evaluation. 
This means, that the query will always
+     * get forwarded to the underlying store.
+     */
+    public TimestampedKeyQuery<K, V> skipCache() {
+        return new TimestampedKeyQuery<>(key, true);
+    }
+
+    /**
+     * The key that was specified for this query.
+     */

Review Comment:
   We should add an `@return` annotation (I know, it's somewhat redundant, but 
still "cleaner" for the JavaDocs that are generated to have it) -- same below.



##########
streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java:
##########
@@ -34,6 +34,8 @@
  * <p>
  *  Keys' order is based on the serialized byte[] of the keys, not the 
'logical' key order.
  * <p>

Review Comment:
   nit: remove unnecessary `<p>` tag



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -180,5 +201,38 @@ public <PS extends Serializer<P>, P> 
KeyValueIterator<Bytes, byte[]> prefixScan(
     public long approximateNumEntries() {
         return store.approximateNumEntries();
     }
+}
 
+class WrappedRocksDbIterator extends AbstractIterator implements 
ManagedKeyValueIterator {

Review Comment:
   Can this class be `private` -- also, should we nest it inside the adaptor 
class?
   
   Would also rename it to `KeyValueToTimestampedKeyValueAdapterIterator` -- 
"wrapped" is very generic and not descriptive.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -180,5 +201,38 @@ public <PS extends Serializer<P>, P> 
KeyValueIterator<Bytes, byte[]> prefixScan(
     public long approximateNumEntries() {
         return store.approximateNumEntries();
     }
+}
 
+class WrappedRocksDbIterator extends AbstractIterator implements 
ManagedKeyValueIterator {
+    final private RocksDbIterator rocksDbIterator;
+    public WrappedRocksDbIterator(final RocksDbIterator rocksDbIterator) {

Review Comment:
   nit: add empty lines before and after constructor



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -180,5 +201,38 @@ public <PS extends Serializer<P>, P> 
KeyValueIterator<Bytes, byte[]> prefixScan(
     public long approximateNumEntries() {
         return store.approximateNumEntries();
     }
+}
 
+class WrappedRocksDbIterator extends AbstractIterator implements 
ManagedKeyValueIterator {
+    final private RocksDbIterator rocksDbIterator;
+    public WrappedRocksDbIterator(final RocksDbIterator rocksDbIterator) {
+        this.rocksDbIterator = rocksDbIterator;
+    }
+    @Override
+    protected Object makeNext() {
+        final KeyValue<Bytes, byte[]> next = rocksDbIterator.makeNext();
+        if (next == null) {
+            allDone();
+            return null;
+        }
+        final Bytes key = next.key;
+        final byte[] value = next.value;
+        final byte[] res = convertToTimestampedFormat(value);
+        return KeyValue.pair(key, res);
+    }
+
+    @Override
+    public void close() {

Review Comment:
   We need to call `rocksDbIterator.close()` here -- same for all other public 
method -- we need to forward the call the the iterator we wrap.



##########
streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kafka.streams.query;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+
+import java.util.Optional;
+
+/**
+ * Interactive query for issuing range queries and scans over Timestamped
+ * KeyValue stores.
+ * <p>
+ *  A range query retrieves a set of records, specified using an upper and/or 
lower bound on the keys.
+ * <p>
+ *  A scan query retrieves all records contained in the store.
+ * <p>
+ *  Keys' order is based on the serialized byte[] of the keys, not the 
'logical' key order.
+ * <p>

Review Comment:
   nit: remove unnecessary `<p>` tag



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -180,5 +201,38 @@ public <PS extends Serializer<P>, P> 
KeyValueIterator<Bytes, byte[]> prefixScan(
     public long approximateNumEntries() {
         return store.approximateNumEntries();
     }
+}
 
+class WrappedRocksDbIterator extends AbstractIterator implements 
ManagedKeyValueIterator {
+    final private RocksDbIterator rocksDbIterator;
+    public WrappedRocksDbIterator(final RocksDbIterator rocksDbIterator) {
+        this.rocksDbIterator = rocksDbIterator;
+    }
+    @Override
+    protected Object makeNext() {
+        final KeyValue<Bytes, byte[]> next = rocksDbIterator.makeNext();
+        if (next == null) {
+            allDone();
+            return null;
+        }
+        final Bytes key = next.key;
+        final byte[] value = next.value;
+        final byte[] res = convertToTimestampedFormat(value);
+        return KeyValue.pair(key, res);

Review Comment:
   Seems we can make this a one-liner:
   ```
   return KeyValue(next.key, convertToTimestampedFormat(next.value));
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,232 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,

Review Comment:
   Why do we add this as `protected`? Seems it could be `private`? (same below)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,232 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,

Review Comment:
   nit: fix intention 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,232 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        if (typedQuery.lowerBound().isPresent() && 
typedQuery.upperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(

Review Comment:
   Given that we can pass in `null` for upper and/or lower bound (with 
semantics "no bound") I am wondering if we can simplify this whole if/else 
stuff here?
   ```
   rawRangeQuery = RangeQuery.withRange(
       keyBytes(typedQuery.lowerBound().orElse(null)),
       keyBytes(typedQuery.upperBound().orElse(null))
   );
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -180,5 +201,38 @@ public <PS extends Serializer<P>, P> 
KeyValueIterator<Bytes, byte[]> prefixScan(
     public long approximateNumEntries() {
         return store.approximateNumEntries();
     }
+}
 
+class WrappedRocksDbIterator extends AbstractIterator implements 
ManagedKeyValueIterator {
+    final private RocksDbIterator rocksDbIterator;
+    public WrappedRocksDbIterator(final RocksDbIterator rocksDbIterator) {
+        this.rocksDbIterator = rocksDbIterator;
+    }
+    @Override
+    protected Object makeNext() {
+        final KeyValue<Bytes, byte[]> next = rocksDbIterator.makeNext();

Review Comment:
   Given that we _wrap_ `RocksDbIterator` we should not call `makeNext` 
directly? But instead of public methods only, ie, is `hasNext()` and `next()` 
instead. 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,232 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());

Review Comment:
   nit: `value` -> `valueAndTimestamp`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,232 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        if (typedQuery.lowerBound().isPresent() && 
typedQuery.upperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.lowerBound().get()),
+                    keyBytes(typedQuery.upperBound().get())
+            );
+        } else if (typedQuery.lowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.lowerBound().get()));
+        } else if (typedQuery.upperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.upperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        if (!isKeyAscending) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> valueAndTimestamp = 
deserializer.apply(rawResult.getResult());
+            final V plainValue = valueAndTimestamp == null ? null : 
valueAndTimestamp.value();
+            final QueryResult<V> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
plainValue);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                               final PositionBound 
positionBound,
+                                               final QueryConfig config) {
+
+        final QueryResult<R> result;
+        final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get())
+            );
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        if (!isKeyAscending) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(

Review Comment:
   For `RangeQuery` we should not return `ValueAndTimestamp`, right? So why are 
we using ` MeteredKeyValueTimestampedIterator` -- where does the conversion 
from `ValueAndTimstamp` to "plain value" happen?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +140,230 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+
+        final RangeQuery<Bytes, byte[]> rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get())
+            );
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    false
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runKeyQuery2(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue2(serdes, wrapped(), true);
+            ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            V res = null;
+            if (value != null) {
+                res = value.value();
+            }
+            final QueryResult<V> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, res);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runRangeQuery2(final Query<R> query,
+                                               final PositionBound 
positionBound,
+                                               final QueryConfig config) {
+
+        final QueryResult<R> result;
+        final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+        final RangeQuery<Bytes, byte[]> rawRangeQuery;
+        if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.getLowerBound().get()),
+                    keyBytes(typedQuery.getUpperBound().get())
+            );
+        } else if (typedQuery.getLowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+        } else if (typedQuery.getUpperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K,V> resultIterator = new 
MeteredKeyValueTimestampedIterator(
+                    iterator,
+                    getSensor,
+                    getDeserializeValue(serdes, wrapped()),
+                    true
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private class MeteredKeyValueTimestampedIterator implements 
KeyValueIterator<K, V> {

Review Comment:
   Seems this comment was addressed yet?



##########
streams/src/main/java/org/apache/kafka/streams/query/TimestampedKeyQuery.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.util.Objects;
+
+/**
+ * Interactive query for retrieving a single record based on its key.
+ */

Review Comment:
   Seems you did not address this ye.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,232 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        if (typedQuery.lowerBound().isPresent() && 
typedQuery.upperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(
+                    keyBytes(typedQuery.lowerBound().get()),
+                    keyBytes(typedQuery.upperBound().get())
+            );
+        } else if (typedQuery.lowerBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.lowerBound().get()));
+        } else if (typedQuery.upperBound().isPresent()) {
+            rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.upperBound().get()));
+        } else {
+            rawRangeQuery = RangeQuery.withNoBounds();
+        }
+        if (!isKeyAscending) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredKeyValueTimestampedIterator(

Review Comment:
   Return type should be ` KeyValueIterator<K, ValueAndTimestamp<V>>`, right?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java:
##########
@@ -126,15 +128,22 @@ public <R> QueryResult<R> query(
         final PositionBound positionBound,
         final QueryConfig config) {
 
-
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-        final QueryResult<R> result = store.query(query, positionBound, 
config);
+        QueryResult<R> result = store.query(query, positionBound, config);
+        Position position = result.getPosition();
+        if (result.isSuccess()) {
+            byte[] res = (byte[]) result.getResult();
+            byte[] res1 = convertToTimestampedFormat(res);
+            result = (QueryResult<R>) QueryResult.forResult(res1);
+        }
+
         if (config.isCollectExecutionInfo()) {
             final long end = System.nanoTime();
             result.addExecutionInfo(
                 "Handled in " + getClass() + " in " + (end - start) + "ns"
             );
         }
+        result.setPosition(position);

Review Comment:
   Don't see any code change that addresses this comment.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##########
@@ -102,4 +139,232 @@ static class RawAndDeserializedValue<ValueType> {
             this.value = value;
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                    query,
+                    positionBound,
+                    config,
+                    this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                        "Handled in " + getClass() + " with serdes "
+                                + serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+            }
+        }
+        return result;
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueAndTimestamp<V>> deserializer = 
getDeserializeValue(serdes, wrapped());
+            final ValueAndTimestamp<V> value = 
deserializer.apply(rawResult.getResult());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final boolean isKeyAscending = typedQuery.isKeyAscending();
+        if (typedQuery.lowerBound().isPresent() && 
typedQuery.upperBound().isPresent()) {
+            rawRangeQuery = RangeQuery.withRange(

Review Comment:
   Might be a good cleanup for `runRangeQuery(...)` too (in here, as well as 
`MeteredKeyValueStore`) -- when `RangeQuery` was added, passing in `null` for 
upper/lower was not supported to the code needed to be written in this complex 
way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to