mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1379469860


##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */

Review Comment:
   add `@param` for key and value types



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).

Review Comment:
   Would we ever return `null` -- I think the iterator would just be empty but 
never be `null`?
   
   Can we extend the JavaDocs similar to `VersionedKeyQuery` and talk about 
default (query full history) vs optionally limit the time range scope?



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);

Review Comment:
   This null check should go into `withKey(...)` (to get flatter stack traces). 
We should also add an error message: `Objects.requireNonNull(key, "key cannot 
be null");`



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null

Review Comment:
   duplcated below (remove this line): `@throws` should be at the end



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>
+     * The key query returns all the records that are still existing in the 
time range starting from the timestamp {@code fromTime}. There can be records 
which have been inserted before the {@code fromTime}
+     * and are valid in the query specified time range (the whole time range 
or even partially). The key query in fact returns all the records that have NOT 
become tombstone at or after {@code fromTime}.
+     * </pre>
+     * @param fromTime The starting time point
+     * If @param fromTime is null, will be considered as negative infinity
+     */
+    public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) {
+        if (fromTime == null) {
+            return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, 
true);
+        }
+        return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), 
toTime, true);
+    }
+
+    /**
+     * Specifies the ending time point for the key query.
+     * The key query returns all the records that have timestamp <= {@code 
toTime}.
+     * @param toTime The ending time point
+     * If @param toTime is null, will be considered as positive infinity
+     */
+    public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime) {
+        if (toTime == null) {
+            return new MultiVersionedKeyQuery<>(key, fromTime, 
Optional.empty(), true);
+        }
+        return new MultiVersionedKeyQuery<>(key, fromTime, 
Optional.of(toTime), true);
+    }
+
+    /**
+     * Specifies the order of the returned records by the query as descending 
by timestamp.
+     */
+    public MultiVersionedKeyQuery<K, V> withDescendingTimestamps() {
+        return new MultiVersionedKeyQuery<>(key, fromTime, toTime, false);
+    }
+
+    /**
+     * Specifies the order of the returned records by the query as ascending 
by timestamp.
+     */
+    public MultiVersionedKeyQuery<K, V> withAscendingTimestamps() {
+        return new MultiVersionedKeyQuery<>(key, fromTime, toTime, true);
+    }
+
+    /**
+     * The key that was specified for this query.
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * The starting time point of the query, if specified
+     */
+    public Optional<Instant> fromTime() {
+        return fromTime;
+    }
+
+    /**
+     * The ending time point of the query, if specified
+     */
+    public Optional<Instant> toTime() {
+        return toTime;
+    }
+
+    /**
+     * @return true if the query returns records in ascending order of 
timestamps

Review Comment:
   Add high level method description



##########
streams/src/main/java/org/apache/kafka/streams/state/ValueIterator.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+
+/**
+ * Iterator interface of {@link VersionedRecord}.
+ * <p>
+ * Users must call its {@code close} method explicitly upon completeness to 
release resources,
+ * or use try-with-resources statement (available since JDK7) for this {@link 
Closeable} class.
+ * Note that {@code remove()} is not supported.
+ *
+ * @param <V> Type of values
+ */
+public interface ValueIterator<V> extends Iterator<V>, Closeable {
+
+    @Override
+    void close();
+
+    /**
+     * Peek the next value without advancing the iterator

Review Comment:
   ```suggestion
        * Peek the next value without advancing the iterator.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java:
##########
@@ -37,8 +38,23 @@ public final class VersionedRecord<V> {
     public VersionedRecord(final V value, final long timestamp) {
         this.value = Objects.requireNonNull(value);
         this.timestamp = timestamp;
+        this.validTo = Long.MAX_VALUE;
     }
 
+    /**
+     * Create a new {@link VersionedRecord} instance. {@code value} cannot be 
{@code null}.
+     *
+     * @param value      the value
+     * @param timestamp  the timestamp
+     * @param validTo    the latest timestamp that value is valid

Review Comment:
   > is valid
   
   Is this correct? I thought it would be the exclusive upper bound of the 
validity interval?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);

Review Comment:
   nit: should we call it `recordTimestamp` instead of `latestTimestamp`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp
+                    ));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");
+            }
+            if (!isAscending) {
+                queryResults.sort((r1, r2) -> (int) (r1.timestamp() - 
r2.timestamp()));
+            }
+            return new VersionedRecordIterator<>(queryResults);

Review Comment:
   Btw: I think we can keep this as an optimization (no need to replace with 
this an "iterator over segment-iterators" appraoch.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##########
@@ -351,6 +383,21 @@ public static <V> Function<byte[], V> 
getDeserializeValue(final StateSerdes<?, V
         return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
     }
 
+    public static <V> ValueIterator<VersionedRecord<V>> 
deserializeValueIterator(final StateSerdes<?, V> serdes,
+        final ValueIterator<VersionedRecord<byte[]>> rawValueIterator) {
+
+        final List<VersionedRecord<V>> versionedRecords = new ArrayList<>();
+        while (rawValueIterator.hasNext()) {
+            final VersionedRecord<byte[]> rawVersionedRecord = 
rawValueIterator.peek();
+            final Deserializer<V> valueDeserializer = 
serdes.valueDeserializer();
+            final long timestamp = rawVersionedRecord.timestamp();
+            final long validTo = rawVersionedRecord.validTo();
+            final V value = valueDeserializer.deserialize(serdes.topic(), 
rawVersionedRecord.value());

Review Comment:
   I think we should not eagerly deserialize the result, but rather wrap the 
`rawValueIterator` with a `typesIterator` which does only deserialize the 
result that was requested via a `next()` call by the application.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -157,6 +213,34 @@ protected <R> QueryResult<R> runKeyQuery(final Query<R> 
query,
             throw new UnsupportedOperationException("Versioned stores do not 
support KeyQuery queries at this time.");
         }
 
+        @SuppressWarnings("unchecked")
+        protected <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query,
+                                                               final 
PositionBound positionBound,
+                                                               final 
QueryConfig config) {
+            final QueryResult<R> result;
+            final MultiVersionedKeyQuery<K, V> typedKeyQuery = 
(MultiVersionedKeyQuery<K, V>) query;
+
+            final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? 
typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE);
+            final Instant toTime = typedKeyQuery.toTime().isPresent() ? 
typedKeyQuery.toTime().get() : Instant.ofEpochMilli(Long.MAX_VALUE);
+            MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = 
MultiVersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+            rawKeyQuery = rawKeyQuery.fromTime(fromTime).toTime(toTime);
+            if (!typedKeyQuery.isAscending()) {

Review Comment:
   Avoid double negation



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>
+     * The key query returns all the records that are still existing in the 
time range starting from the timestamp {@code fromTime}. There can be records 
which have been inserted before the {@code fromTime}

Review Comment:
   nit: avoid too long lines



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>
+     * The key query returns all the records that are still existing in the 
time range starting from the timestamp {@code fromTime}. There can be records 
which have been inserted before the {@code fromTime}
+     * and are valid in the query specified time range (the whole time range 
or even partially). The key query in fact returns all the records that have NOT 
become tombstone at or after {@code fromTime}.
+     * </pre>
+     * @param fromTime The starting time point
+     * If @param fromTime is null, will be considered as negative infinity
+     */
+    public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) {
+        if (fromTime == null) {
+            return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, 
true);
+        }
+        return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), 
toTime, true);
+    }
+
+    /**
+     * Specifies the ending time point for the key query.
+     * The key query returns all the records that have timestamp <= {@code 
toTime}.
+     * @param toTime The ending time point
+     * If @param toTime is null, will be considered as positive infinity
+     */
+    public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime) {
+        if (toTime == null) {
+            return new MultiVersionedKeyQuery<>(key, fromTime, 
Optional.empty(), true);
+        }
+        return new MultiVersionedKeyQuery<>(key, fromTime, 
Optional.of(toTime), true);
+    }
+
+    /**
+     * Specifies the order of the returned records by the query as descending 
by timestamp.
+     */
+    public MultiVersionedKeyQuery<K, V> withDescendingTimestamps() {
+        return new MultiVersionedKeyQuery<>(key, fromTime, toTime, false);
+    }
+
+    /**
+     * Specifies the order of the returned records by the query as ascending 
by timestamp.
+     */
+    public MultiVersionedKeyQuery<K, V> withAscendingTimestamps() {
+        return new MultiVersionedKeyQuery<>(key, fromTime, toTime, true);
+    }
+
+    /**
+     * The key that was specified for this query.
+     */
+    public K key() {

Review Comment:
   Add `@return` tag (same blow)



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>
+     * The key query returns all the records that are still existing in the 
time range starting from the timestamp {@code fromTime}. There can be records 
which have been inserted before the {@code fromTime}
+     * and are valid in the query specified time range (the whole time range 
or even partially). The key query in fact returns all the records that have NOT 
become tombstone at or after {@code fromTime}.
+     * </pre>
+     * @param fromTime The starting time point
+     * If @param fromTime is null, will be considered as negative infinity
+     */
+    public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) {
+        if (fromTime == null) {
+            return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, 
true);
+        }
+        return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), 
toTime, true);
+    }
+
+    /**
+     * Specifies the ending time point for the key query.
+     * The key query returns all the records that have timestamp <= {@code 
toTime}.
+     * @param toTime The ending time point
+     * If @param toTime is null, will be considered as positive infinity
+     */
+    public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime) {
+        if (toTime == null) {
+            return new MultiVersionedKeyQuery<>(key, fromTime, 
Optional.empty(), true);
+        }
+        return new MultiVersionedKeyQuery<>(key, fromTime, 
Optional.of(toTime), true);
+    }
+
+    /**
+     * Specifies the order of the returned records by the query as descending 
by timestamp.
+     */
+    public MultiVersionedKeyQuery<K, V> withDescendingTimestamps() {
+        return new MultiVersionedKeyQuery<>(key, fromTime, toTime, false);
+    }
+
+    /**
+     * Specifies the order of the returned records by the query as ascending 
by timestamp.
+     */
+    public MultiVersionedKeyQuery<K, V> withAscendingTimestamps() {

Review Comment:
   Just thinking about consistency again -- for `RangeQuery` (Hanyu's KIP) we 
did not add this, because it's the default anyway. Should we remove it (and 
update the KIP accordingly) -- or update Hanyu's KIP to add something similar?



##########
streams/src/main/java/org/apache/kafka/streams/state/ValueIterator.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+
+/**
+ * Iterator interface of {@link VersionedRecord}.

Review Comment:
   Why "of VerionedRecord`? -- The interface itself does not say anything about 
`VersioneRecord` but take a generic `V` type...



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve

Review Comment:
   nit: add newline before staring with annotations
   
   "to retrieve"? We don't retrieve the key, but the values of this key.



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>
+     * The key query returns all the records that are still existing in the 
time range starting from the timestamp {@code fromTime}. There can be records 
which have been inserted before the {@code fromTime}
+     * and are valid in the query specified time range (the whole time range 
or even partially). The key query in fact returns all the records that have NOT 
become tombstone at or after {@code fromTime}.

Review Comment:
   > NOT become tombstone
   
   Is a value get an update, it not tombstoned either but might still be 
returned.



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>

Review Comment:
   Why are you using `<pre>` here? We should use `<p>` (note: for `<p>` no 
closing tag is required in JavaDocs -- JavaDocs is not the same as HTML even if 
it's borrowing a lot of stuff from it)



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>
+     * The key query returns all the records that are still existing in the 
time range starting from the timestamp {@code fromTime}. There can be records 
which have been inserted before the {@code fromTime}
+     * and are valid in the query specified time range (the whole time range 
or even partially). The key query in fact returns all the records that have NOT 
become tombstone at or after {@code fromTime}.
+     * </pre>
+     * @param fromTime The starting time point
+     * If @param fromTime is null, will be considered as negative infinity
+     */
+    public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) {
+        if (fromTime == null) {
+            return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, 
true);
+        }
+        return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), 
toTime, true);
+    }
+
+    /**
+     * Specifies the ending time point for the key query.
+     * The key query returns all the records that have timestamp <= {@code 
toTime}.
+     * @param toTime The ending time point
+     * If @param toTime is null, will be considered as positive infinity

Review Comment:
   as above



##########
streams/src/main/java/org/apache/kafka/streams/state/ValueIterator.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+
+/**
+ * Iterator interface of {@link VersionedRecord}.
+ * <p>
+ * Users must call its {@code close} method explicitly upon completeness to 
release resources,
+ * or use try-with-resources statement (available since JDK7) for this {@link 
Closeable} class.
+ * Note that {@code remove()} is not supported.
+ *
+ * @param <V> Type of values
+ */
+public interface ValueIterator<V> extends Iterator<V>, Closeable {
+
+    @Override
+    void close();
+
+    /**
+     * Peek the next value without advancing the iterator
+     * @return the value that would be returned from the next call to next

Review Comment:
   ```suggestion
        * @return The value that would be returned from the next call to {@link 
#next()}.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>
+     * The key query returns all the records that are still existing in the 
time range starting from the timestamp {@code fromTime}. There can be records 
which have been inserted before the {@code fromTime}
+     * and are valid in the query specified time range (the whole time range 
or even partially). The key query in fact returns all the records that have NOT 
become tombstone at or after {@code fromTime}.
+     * </pre>
+     * @param fromTime The starting time point
+     * If @param fromTime is null, will be considered as negative infinity
+     */
+    public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) {
+        if (fromTime == null) {
+            return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, 
true);
+        }
+        return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), 
toTime, true);
+    }
+
+    /**
+     * Specifies the ending time point for the key query.
+     * The key query returns all the records that have timestamp <= {@code 
toTime}.

Review Comment:
   I am flexible, but we should try to be consistent with formatting. In 
`VersionedKeyQuery` you used
   ```
   timestamp <= toTime.
   ```
   w/o the `{@code} annotation. (also `<=` need escaping)



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>
+     * The key query returns all the records that are still existing in the 
time range starting from the timestamp {@code fromTime}. There can be records 
which have been inserted before the {@code fromTime}
+     * and are valid in the query specified time range (the whole time range 
or even partially). The key query in fact returns all the records that have NOT 
become tombstone at or after {@code fromTime}.
+     * </pre>
+     * @param fromTime The starting time point
+     * If @param fromTime is null, will be considered as negative infinity

Review Comment:
   `@param` is not valid JavaDoc here (use `{@code fromFrom}` instead)
   
   Maybe also `{@code null}`
   
   `as negative infinity, ie, no lower bound`



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>
+     * The key query returns all the records that are still existing in the 
time range starting from the timestamp {@code fromTime}. There can be records 
which have been inserted before the {@code fromTime}
+     * and are valid in the query specified time range (the whole time range 
or even partially). The key query in fact returns all the records that have NOT 
become tombstone at or after {@code fromTime}.
+     * </pre>
+     * @param fromTime The starting time point
+     * If @param fromTime is null, will be considered as negative infinity
+     */
+    public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) {
+        if (fromTime == null) {

Review Comment:
   Don't think we need this check, but can use `Optional.ofNullable(fromTime)`



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).
+       * @param key The key to retrieve
+       * @throws NullPointerException if @param key is null
+       * @param <K> The type of the key
+       * @param <V> The type of the value that will be retrieved
+       * @throws NullPointerException if @param key is null
+       */
+    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
+        return new MultiVersionedKeyQuery<>(key, Optional.empty(), 
Optional.empty(), true);
+    }
+
+    /**
+     * Specifies the starting time point for the key query.
+     * <pre>
+     * The key query returns all the records that are still existing in the 
time range starting from the timestamp {@code fromTime}. There can be records 
which have been inserted before the {@code fromTime}
+     * and are valid in the query specified time range (the whole time range 
or even partially). The key query in fact returns all the records that have NOT 
become tombstone at or after {@code fromTime}.
+     * </pre>
+     * @param fromTime The starting time point
+     * If @param fromTime is null, will be considered as negative infinity
+     */
+    public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) {
+        if (fromTime == null) {
+            return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, 
true);
+        }
+        return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), 
toTime, true);
+    }
+
+    /**
+     * Specifies the ending time point for the key query.
+     * The key query returns all the records that have timestamp <= {@code 
toTime}.
+     * @param toTime The ending time point
+     * If @param toTime is null, will be considered as positive infinity
+     */
+    public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime) {
+        if (toTime == null) {

Review Comment:
   as above



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -157,6 +213,34 @@ protected <R> QueryResult<R> runKeyQuery(final Query<R> 
query,
             throw new UnsupportedOperationException("Versioned stores do not 
support KeyQuery queries at this time.");
         }
 
+        @SuppressWarnings("unchecked")
+        protected <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query,
+                                                               final 
PositionBound positionBound,
+                                                               final 
QueryConfig config) {
+            final QueryResult<R> result;
+            final MultiVersionedKeyQuery<K, V> typedKeyQuery = 
(MultiVersionedKeyQuery<K, V>) query;
+
+            final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? 
typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE);
+            final Instant toTime = typedKeyQuery.toTime().isPresent() ? 
typedKeyQuery.toTime().get() : Instant.ofEpochMilli(Long.MAX_VALUE);
+            MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = 
MultiVersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+            rawKeyQuery = rawKeyQuery.fromTime(fromTime).toTime(toTime);
+            if (!typedKeyQuery.isAscending()) {

Review Comment:
   ```suggestion
               if (typedKeyQuery.isDescending()) {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp
+                    ));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");

Review Comment:
   Why do we log a WARNING here? Returning nothing seems to be regular 
execution -- we should not "worry" users about this.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -157,6 +213,34 @@ protected <R> QueryResult<R> runKeyQuery(final Query<R> 
query,
             throw new UnsupportedOperationException("Versioned stores do not 
support KeyQuery queries at this time.");
         }
 
+        @SuppressWarnings("unchecked")
+        protected <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query,

Review Comment:
   Why do we add this as `protected` ? Should be `private` IMHO.



##########
streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java:
##########
@@ -37,8 +38,23 @@ public final class VersionedRecord<V> {
     public VersionedRecord(final V value, final long timestamp) {
         this.value = Objects.requireNonNull(value);
         this.timestamp = timestamp;
+        this.validTo = Long.MAX_VALUE;
     }
 
+    /**
+     * Create a new {@link VersionedRecord} instance. {@code value} cannot be 
{@code null}.
+     *
+     * @param value      the value
+     * @param timestamp  the timestamp
+     * @param validTo    the latest timestamp that value is valid
+     */
+    public VersionedRecord(final V value, final long timestamp, final long 
validTo) {
+        this.value = Objects.requireNonNull(value);

Review Comment:
   We should add an error message (maybe also fix in the existing constructor)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp
+                    ));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");
+            }
+            if (!isAscending) {
+                queryResults.sort((r1, r2) -> (int) (r1.timestamp() - 
r2.timestamp()));

Review Comment:
   For this case, we now that the result is zero or no record -- seems we can 
skip this one?



##########
streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java:
##########
@@ -47,6 +63,10 @@ public long timestamp() {
         return timestamp;
     }
 
+    public long validTo() {
+        return validTo;
+    }
+
     @Override
     public String toString() {
         return "<" + value + "," + timestamp + ">";

Review Comment:
   Needs an update (same for `equals()` and `hashCode()` below)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");

Review Comment:
   Do we need this on an internal method (it's not `public`)?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##########
@@ -335,6 +345,28 @@ private static <R> QueryResult<R> 
runWindowRangeQuery(final Query<R> query,
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query,
+                                                                final 
PositionBound positionBound,
+                                                                final 
QueryConfig config,
+                                                                final 
StateStore store) {
+        if (store instanceof VersionedKeyValueStore) {
+            final RocksDBVersionedStore rocksDBVersionedStore = 
(RocksDBVersionedStore) store;
+            final MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = 
(MultiVersionedKeyQuery<Bytes, byte[]>) query;
+            try {
+                final long fromTime = rawKeyQuery.fromTime().isPresent() ? 
rawKeyQuery.fromTime().get().toEpochMilli() : Long.MIN_VALUE;

Review Comment:
   Do we need the `isPresent()` check -- seems we do this already in the 
"metered layer" -- seems redundant? 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp
+                    ));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");
+            }
+            if (!isAscending) {
+                queryResults.sort((r1, r2) -> (int) (r1.timestamp() - 
r2.timestamp()));
+            }
+            return new VersionedRecordIterator<>(queryResults);
+        } else {
+            // first check the latest value store
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = LatestValueFormatter.getTimestamp(
+                    rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp));
+                }
+            }
+
+            // check segment stores
+            final List<LogicalKeyValueSegment> segments = 
segmentStores.segments(Long.MIN_VALUE,
+                toTimestamp, false);
+            for (final LogicalKeyValueSegment segment : segments) {
+                final byte[] rawSegmentValue = segment.get(key);
+                if (rawSegmentValue != null) {
+
+                    if 
(RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue) < 
fromTimestamp) {
+                        // this segment contains no data for the queried time 
range, so earlier segments
+                        // cannot either
+                        break;
+                    }
+
+                    // the desired result is contained in this segment

Review Comment:
   Sounds like if this segment contains the full result...
   ```
   // this segment contains data belonging to the result
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp
+                    ));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");
+            }
+            if (!isAscending) {
+                queryResults.sort((r1, r2) -> (int) (r1.timestamp() - 
r2.timestamp()));
+            }
+            return new VersionedRecordIterator<>(queryResults);
+        } else {
+            // first check the latest value store
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = LatestValueFormatter.getTimestamp(
+                    rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp));
+                }
+            }
+
+            // check segment stores
+            final List<LogicalKeyValueSegment> segments = 
segmentStores.segments(Long.MIN_VALUE,
+                toTimestamp, false);
+            for (final LogicalKeyValueSegment segment : segments) {
+                final byte[] rawSegmentValue = segment.get(key);
+                if (rawSegmentValue != null) {
+
+                    if 
(RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue) < 
fromTimestamp) {
+                        // this segment contains no data for the queried time 
range, so earlier segments
+                        // cannot either
+                        break;
+                    }
+
+                    // the desired result is contained in this segment
+                    final List<SegmentSearchResult> searchResults = 
RocksDBVersionedStoreSegmentValueFormatter
+                                                                    
.deserialize(rawSegmentValue)
+                                                                    
.findAll(fromTimestamp, toTimestamp);
+                    for (final SegmentSearchResult searchResult : 
searchResults) {
+                        if (searchResult.value() != null && 
searchResult.validFrom() <= toTimestamp && searchResult.validTo() >= 
fromTimestamp) {

Review Comment:
   When could `searchResult.value()` be `null`? Is is the tombstone case?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp
+                    ));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");
+            }
+            if (!isAscending) {
+                queryResults.sort((r1, r2) -> (int) (r1.timestamp() - 
r2.timestamp()));
+            }
+            return new VersionedRecordIterator<>(queryResults);
+        } else {
+            // first check the latest value store
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = LatestValueFormatter.getTimestamp(
+                    rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp));
+                }
+            }
+
+            // check segment stores
+            final List<LogicalKeyValueSegment> segments = 
segmentStores.segments(Long.MIN_VALUE,

Review Comment:
   Remind me: why can't we pass in `fromTimestamp`? (might be worth to add a 
comment to explain it).



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##########
@@ -335,6 +345,28 @@ private static <R> QueryResult<R> 
runWindowRangeQuery(final Query<R> query,
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> 
query,
+                                                                final 
PositionBound positionBound,
+                                                                final 
QueryConfig config,
+                                                                final 
StateStore store) {
+        if (store instanceof VersionedKeyValueStore) {

Review Comment:
   I think we don't need this check?



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */

Review Comment:
   Should we talk about default ordering in the returned iterator?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##########
@@ -351,6 +383,21 @@ public static <V> Function<byte[], V> 
getDeserializeValue(final StateSerdes<?, V
         return byteArray -> deserializer.deserialize(serdes.topic(), 
byteArray);
     }
 
+    public static <V> ValueIterator<VersionedRecord<V>> 
deserializeValueIterator(final StateSerdes<?, V> serdes,
+        final ValueIterator<VersionedRecord<byte[]>> rawValueIterator) {

Review Comment:
   nit: formatting / indention



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();

Review Comment:
   Not sure if we should materialize the result in-memory -- it there is a long 
history, it might put a lot of memory pressure on the JVM. -- Instead, we could 
return an Iterator of "segment iterators" (only for segments that contain data, 
or might contain data) and to the iteration "lazy / on-demand" instead.
   
   It's an somewhat open question for a single-key query, but we don't want to 
materialize the result for range-key queries for sure...
   
   (Cf. range queries over windowed stores for which we do the same -- at least 
high level -- given how the segment store work, we might not want to use an 
actual RocksDBIterator -- cf my top level review comment)



##########
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Interactive query for retrieving a set of records with the same specified 
key and different timestamps within the specified time range.
+ */
+@Evolving
+public final class MultiVersionedKeyQuery<K, V> implements 
Query<ValueIterator<VersionedRecord<V>>> {
+
+    private final K key;
+    private final Optional<Instant> fromTime;
+    private final Optional<Instant> toTime;
+    private final boolean isAscending;
+
+    private MultiVersionedKeyQuery(final K key, final Optional<Instant> 
fromTime, final Optional<Instant> toTime, final boolean isAscending) {
+        this.key = Objects.requireNonNull(key);
+        this.fromTime = fromTime;
+        this.toTime = toTime;
+        this.isAscending = isAscending;
+    }
+
+      /**
+       * Creates a query that will retrieve the set of records identified by 
{@code key} if any exists
+       * (or {@code null} otherwise).

Review Comment:
   Should we talk about default ordering?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedRecordIterator.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.kafka.streams.state.ValueIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+public class VersionedRecordIterator<V> implements 
ValueIterator<VersionedRecord<V>> {
+
+    protected final List<VersionedRecord<V>> records;
+    protected int currentIndex;
+
+    public VersionedRecordIterator(final List<VersionedRecord<V>> records) {
+        this.records = records;
+        this.currentIndex = 0;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public VersionedRecord<V> peek() {
+        return next();

Review Comment:
   peek() should not advance the iterator, and thus cannot call next().
   
   Can we re-use `AbstractIterator` and only implement `makeNext()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to