mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1379469860
########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ Review Comment: add `@param` for key and value types ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). Review Comment: Would we ever return `null` -- I think the iterator would just be empty but never be `null`? Can we extend the JavaDocs similar to `VersionedKeyQuery` and talk about default (query full history) vs optionally limit the time range scope? ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); Review Comment: This null check should go into `withKey(...)` (to get flatter stack traces). We should also add an error message: `Objects.requireNonNull(key, "key cannot be null");` ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null Review Comment: duplcated below (remove this line): `@throws` should be at the end ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> + * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} + * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}. + * </pre> + * @param fromTime The starting time point + * If @param fromTime is null, will be considered as negative infinity + */ + public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) { + if (fromTime == null) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, true); + } + return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), toTime, true); + } + + /** + * Specifies the ending time point for the key query. + * The key query returns all the records that have timestamp <= {@code toTime}. + * @param toTime The ending time point + * If @param toTime is null, will be considered as positive infinity + */ + public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime) { + if (toTime == null) { + return new MultiVersionedKeyQuery<>(key, fromTime, Optional.empty(), true); + } + return new MultiVersionedKeyQuery<>(key, fromTime, Optional.of(toTime), true); + } + + /** + * Specifies the order of the returned records by the query as descending by timestamp. + */ + public MultiVersionedKeyQuery<K, V> withDescendingTimestamps() { + return new MultiVersionedKeyQuery<>(key, fromTime, toTime, false); + } + + /** + * Specifies the order of the returned records by the query as ascending by timestamp. + */ + public MultiVersionedKeyQuery<K, V> withAscendingTimestamps() { + return new MultiVersionedKeyQuery<>(key, fromTime, toTime, true); + } + + /** + * The key that was specified for this query. + */ + public K key() { + return key; + } + + /** + * The starting time point of the query, if specified + */ + public Optional<Instant> fromTime() { + return fromTime; + } + + /** + * The ending time point of the query, if specified + */ + public Optional<Instant> toTime() { + return toTime; + } + + /** + * @return true if the query returns records in ascending order of timestamps Review Comment: Add high level method description ########## streams/src/main/java/org/apache/kafka/streams/state/ValueIterator.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.io.Closeable; +import java.util.Iterator; + + +/** + * Iterator interface of {@link VersionedRecord}. + * <p> + * Users must call its {@code close} method explicitly upon completeness to release resources, + * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class. + * Note that {@code remove()} is not supported. + * + * @param <V> Type of values + */ +public interface ValueIterator<V> extends Iterator<V>, Closeable { + + @Override + void close(); + + /** + * Peek the next value without advancing the iterator Review Comment: ```suggestion * Peek the next value without advancing the iterator. ``` ########## streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java: ########## @@ -37,8 +38,23 @@ public final class VersionedRecord<V> { public VersionedRecord(final V value, final long timestamp) { this.value = Objects.requireNonNull(value); this.timestamp = timestamp; + this.validTo = Long.MAX_VALUE; } + /** + * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}. + * + * @param value the value + * @param timestamp the timestamp + * @param validTo the latest timestamp that value is valid Review Comment: > is valid Is this correct? I thought it would be the exclusive upper bound of the validity interval? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + // Visible for testing + ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); Review Comment: nit: should we call it `recordTimestamp` instead of `latestTimestamp`? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + // Visible for testing + ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (latestTimestamp <= toTimestamp) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>( + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + latestTimestamp + )); + } + } + + // history retention has elapsed and the latest record version (if present) does + // not satisfy the timestamp bound. return null for predictability, even if data + // is still present in segments. + if (queryResults.size() == 0) { + LOG.warn("Returning null for expired get."); + } + if (!isAscending) { + queryResults.sort((r1, r2) -> (int) (r1.timestamp() - r2.timestamp())); + } + return new VersionedRecordIterator<>(queryResults); Review Comment: Btw: I think we can keep this as an optimization (no need to replace with this an "iterator over segment-iterators" appraoch. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ########## @@ -351,6 +383,21 @@ public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V return byteArray -> deserializer.deserialize(serdes.topic(), byteArray); } + public static <V> ValueIterator<VersionedRecord<V>> deserializeValueIterator(final StateSerdes<?, V> serdes, + final ValueIterator<VersionedRecord<byte[]>> rawValueIterator) { + + final List<VersionedRecord<V>> versionedRecords = new ArrayList<>(); + while (rawValueIterator.hasNext()) { + final VersionedRecord<byte[]> rawVersionedRecord = rawValueIterator.peek(); + final Deserializer<V> valueDeserializer = serdes.valueDeserializer(); + final long timestamp = rawVersionedRecord.timestamp(); + final long validTo = rawVersionedRecord.validTo(); + final V value = valueDeserializer.deserialize(serdes.topic(), rawVersionedRecord.value()); Review Comment: I think we should not eagerly deserialize the result, but rather wrap the `rawValueIterator` with a `typesIterator` which does only deserialize the result that was requested via a `next()` call by the application. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -157,6 +213,34 @@ protected <R> QueryResult<R> runKeyQuery(final Query<R> query, throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time."); } + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final MultiVersionedKeyQuery<K, V> typedKeyQuery = (MultiVersionedKeyQuery<K, V>) query; + + final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE); + final Instant toTime = typedKeyQuery.toTime().isPresent() ? typedKeyQuery.toTime().get() : Instant.ofEpochMilli(Long.MAX_VALUE); + MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = MultiVersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key())); + rawKeyQuery = rawKeyQuery.fromTime(fromTime).toTime(toTime); + if (!typedKeyQuery.isAscending()) { Review Comment: Avoid double negation ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> + * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} Review Comment: nit: avoid too long lines ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> + * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} + * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}. + * </pre> + * @param fromTime The starting time point + * If @param fromTime is null, will be considered as negative infinity + */ + public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) { + if (fromTime == null) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, true); + } + return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), toTime, true); + } + + /** + * Specifies the ending time point for the key query. + * The key query returns all the records that have timestamp <= {@code toTime}. + * @param toTime The ending time point + * If @param toTime is null, will be considered as positive infinity + */ + public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime) { + if (toTime == null) { + return new MultiVersionedKeyQuery<>(key, fromTime, Optional.empty(), true); + } + return new MultiVersionedKeyQuery<>(key, fromTime, Optional.of(toTime), true); + } + + /** + * Specifies the order of the returned records by the query as descending by timestamp. + */ + public MultiVersionedKeyQuery<K, V> withDescendingTimestamps() { + return new MultiVersionedKeyQuery<>(key, fromTime, toTime, false); + } + + /** + * Specifies the order of the returned records by the query as ascending by timestamp. + */ + public MultiVersionedKeyQuery<K, V> withAscendingTimestamps() { + return new MultiVersionedKeyQuery<>(key, fromTime, toTime, true); + } + + /** + * The key that was specified for this query. + */ + public K key() { Review Comment: Add `@return` tag (same blow) ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> + * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} + * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}. + * </pre> + * @param fromTime The starting time point + * If @param fromTime is null, will be considered as negative infinity + */ + public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) { + if (fromTime == null) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, true); + } + return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), toTime, true); + } + + /** + * Specifies the ending time point for the key query. + * The key query returns all the records that have timestamp <= {@code toTime}. + * @param toTime The ending time point + * If @param toTime is null, will be considered as positive infinity + */ + public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime) { + if (toTime == null) { + return new MultiVersionedKeyQuery<>(key, fromTime, Optional.empty(), true); + } + return new MultiVersionedKeyQuery<>(key, fromTime, Optional.of(toTime), true); + } + + /** + * Specifies the order of the returned records by the query as descending by timestamp. + */ + public MultiVersionedKeyQuery<K, V> withDescendingTimestamps() { + return new MultiVersionedKeyQuery<>(key, fromTime, toTime, false); + } + + /** + * Specifies the order of the returned records by the query as ascending by timestamp. + */ + public MultiVersionedKeyQuery<K, V> withAscendingTimestamps() { Review Comment: Just thinking about consistency again -- for `RangeQuery` (Hanyu's KIP) we did not add this, because it's the default anyway. Should we remove it (and update the KIP accordingly) -- or update Hanyu's KIP to add something similar? ########## streams/src/main/java/org/apache/kafka/streams/state/ValueIterator.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.io.Closeable; +import java.util.Iterator; + + +/** + * Iterator interface of {@link VersionedRecord}. Review Comment: Why "of VerionedRecord`? -- The interface itself does not say anything about `VersioneRecord` but take a generic `V` type... ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve Review Comment: nit: add newline before staring with annotations "to retrieve"? We don't retrieve the key, but the values of this key. ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> + * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} + * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}. Review Comment: > NOT become tombstone Is a value get an update, it not tombstoned either but might still be returned. ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> Review Comment: Why are you using `<pre>` here? We should use `<p>` (note: for `<p>` no closing tag is required in JavaDocs -- JavaDocs is not the same as HTML even if it's borrowing a lot of stuff from it) ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> + * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} + * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}. + * </pre> + * @param fromTime The starting time point + * If @param fromTime is null, will be considered as negative infinity + */ + public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) { + if (fromTime == null) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, true); + } + return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), toTime, true); + } + + /** + * Specifies the ending time point for the key query. + * The key query returns all the records that have timestamp <= {@code toTime}. + * @param toTime The ending time point + * If @param toTime is null, will be considered as positive infinity Review Comment: as above ########## streams/src/main/java/org/apache/kafka/streams/state/ValueIterator.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.io.Closeable; +import java.util.Iterator; + + +/** + * Iterator interface of {@link VersionedRecord}. + * <p> + * Users must call its {@code close} method explicitly upon completeness to release resources, + * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class. + * Note that {@code remove()} is not supported. + * + * @param <V> Type of values + */ +public interface ValueIterator<V> extends Iterator<V>, Closeable { + + @Override + void close(); + + /** + * Peek the next value without advancing the iterator + * @return the value that would be returned from the next call to next Review Comment: ```suggestion * @return The value that would be returned from the next call to {@link #next()}. ``` ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> + * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} + * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}. + * </pre> + * @param fromTime The starting time point + * If @param fromTime is null, will be considered as negative infinity + */ + public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) { + if (fromTime == null) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, true); + } + return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), toTime, true); + } + + /** + * Specifies the ending time point for the key query. + * The key query returns all the records that have timestamp <= {@code toTime}. Review Comment: I am flexible, but we should try to be consistent with formatting. In `VersionedKeyQuery` you used ``` timestamp <= toTime. ``` w/o the `{@code} annotation. (also `<=` need escaping) ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> + * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} + * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}. + * </pre> + * @param fromTime The starting time point + * If @param fromTime is null, will be considered as negative infinity Review Comment: `@param` is not valid JavaDoc here (use `{@code fromFrom}` instead) Maybe also `{@code null}` `as negative infinity, ie, no lower bound` ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> + * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} + * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}. + * </pre> + * @param fromTime The starting time point + * If @param fromTime is null, will be considered as negative infinity + */ + public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) { + if (fromTime == null) { Review Comment: Don't think we need this check, but can use `Optional.ofNullable(fromTime)` ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @throws NullPointerException if @param key is null + * @param <K> The type of the key + * @param <V> The type of the value that will be retrieved + * @throws NullPointerException if @param key is null + */ + public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), true); + } + + /** + * Specifies the starting time point for the key query. + * <pre> + * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} + * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}. + * </pre> + * @param fromTime The starting time point + * If @param fromTime is null, will be considered as negative infinity + */ + public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) { + if (fromTime == null) { + return new MultiVersionedKeyQuery<>(key, Optional.empty(), toTime, true); + } + return new MultiVersionedKeyQuery<>(key, Optional.of(fromTime), toTime, true); + } + + /** + * Specifies the ending time point for the key query. + * The key query returns all the records that have timestamp <= {@code toTime}. + * @param toTime The ending time point + * If @param toTime is null, will be considered as positive infinity + */ + public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime) { + if (toTime == null) { Review Comment: as above ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -157,6 +213,34 @@ protected <R> QueryResult<R> runKeyQuery(final Query<R> query, throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time."); } + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final MultiVersionedKeyQuery<K, V> typedKeyQuery = (MultiVersionedKeyQuery<K, V>) query; + + final Instant fromTime = typedKeyQuery.fromTime().isPresent() ? typedKeyQuery.fromTime().get() : Instant.ofEpochMilli(Long.MIN_VALUE); + final Instant toTime = typedKeyQuery.toTime().isPresent() ? typedKeyQuery.toTime().get() : Instant.ofEpochMilli(Long.MAX_VALUE); + MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = MultiVersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key())); + rawKeyQuery = rawKeyQuery.fromTime(fromTime).toTime(toTime); + if (!typedKeyQuery.isAscending()) { Review Comment: ```suggestion if (typedKeyQuery.isDescending()) { ``` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + // Visible for testing + ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (latestTimestamp <= toTimestamp) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>( + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + latestTimestamp + )); + } + } + + // history retention has elapsed and the latest record version (if present) does + // not satisfy the timestamp bound. return null for predictability, even if data + // is still present in segments. + if (queryResults.size() == 0) { + LOG.warn("Returning null for expired get."); Review Comment: Why do we log a WARNING here? Returning nothing seems to be regular execution -- we should not "worry" users about this. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -157,6 +213,34 @@ protected <R> QueryResult<R> runKeyQuery(final Query<R> query, throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time."); } + @SuppressWarnings("unchecked") + protected <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, Review Comment: Why do we add this as `protected` ? Should be `private` IMHO. ########## streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java: ########## @@ -37,8 +38,23 @@ public final class VersionedRecord<V> { public VersionedRecord(final V value, final long timestamp) { this.value = Objects.requireNonNull(value); this.timestamp = timestamp; + this.validTo = Long.MAX_VALUE; } + /** + * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}. + * + * @param value the value + * @param timestamp the timestamp + * @param validTo the latest timestamp that value is valid + */ + public VersionedRecord(final V value, final long timestamp, final long validTo) { + this.value = Objects.requireNonNull(value); Review Comment: We should add an error message (maybe also fix in the existing constructor) ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + // Visible for testing + ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (latestTimestamp <= toTimestamp) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>( + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + latestTimestamp + )); + } + } + + // history retention has elapsed and the latest record version (if present) does + // not satisfy the timestamp bound. return null for predictability, even if data + // is still present in segments. + if (queryResults.size() == 0) { + LOG.warn("Returning null for expired get."); + } + if (!isAscending) { + queryResults.sort((r1, r2) -> (int) (r1.timestamp() - r2.timestamp())); Review Comment: For this case, we now that the result is zero or no record -- seems we can skip this one? ########## streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java: ########## @@ -47,6 +63,10 @@ public long timestamp() { return timestamp; } + public long validTo() { + return validTo; + } + @Override public String toString() { return "<" + value + "," + timestamp + ">"; Review Comment: Needs an update (same for `equals()` and `hashCode()` below) ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + // Visible for testing + ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); Review Comment: Do we need this on an internal method (it's not `public`)? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ########## @@ -335,6 +345,28 @@ private static <R> QueryResult<R> runWindowRangeQuery(final Query<R> query, } } + @SuppressWarnings("unchecked") + private static <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config, + final StateStore store) { + if (store instanceof VersionedKeyValueStore) { + final RocksDBVersionedStore rocksDBVersionedStore = (RocksDBVersionedStore) store; + final MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery = (MultiVersionedKeyQuery<Bytes, byte[]>) query; + try { + final long fromTime = rawKeyQuery.fromTime().isPresent() ? rawKeyQuery.fromTime().get().toEpochMilli() : Long.MIN_VALUE; Review Comment: Do we need the `isPresent()` check -- seems we do this already in the "metered layer" -- seems redundant? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + // Visible for testing + ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (latestTimestamp <= toTimestamp) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>( + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + latestTimestamp + )); + } + } + + // history retention has elapsed and the latest record version (if present) does + // not satisfy the timestamp bound. return null for predictability, even if data + // is still present in segments. + if (queryResults.size() == 0) { + LOG.warn("Returning null for expired get."); + } + if (!isAscending) { + queryResults.sort((r1, r2) -> (int) (r1.timestamp() - r2.timestamp())); + } + return new VersionedRecordIterator<>(queryResults); + } else { + // first check the latest value store + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long latestTimestamp = LatestValueFormatter.getTimestamp( + rawLatestValueAndTimestamp); + if (latestTimestamp <= toTimestamp) { + queryResults.add(new VersionedRecord<>( + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + latestTimestamp)); + } + } + + // check segment stores + final List<LogicalKeyValueSegment> segments = segmentStores.segments(Long.MIN_VALUE, + toTimestamp, false); + for (final LogicalKeyValueSegment segment : segments) { + final byte[] rawSegmentValue = segment.get(key); + if (rawSegmentValue != null) { + + if (RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue) < fromTimestamp) { + // this segment contains no data for the queried time range, so earlier segments + // cannot either + break; + } + + // the desired result is contained in this segment Review Comment: Sounds like if this segment contains the full result... ``` // this segment contains data belonging to the result ``` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + // Visible for testing + ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (latestTimestamp <= toTimestamp) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>( + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + latestTimestamp + )); + } + } + + // history retention has elapsed and the latest record version (if present) does + // not satisfy the timestamp bound. return null for predictability, even if data + // is still present in segments. + if (queryResults.size() == 0) { + LOG.warn("Returning null for expired get."); + } + if (!isAscending) { + queryResults.sort((r1, r2) -> (int) (r1.timestamp() - r2.timestamp())); + } + return new VersionedRecordIterator<>(queryResults); + } else { + // first check the latest value store + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long latestTimestamp = LatestValueFormatter.getTimestamp( + rawLatestValueAndTimestamp); + if (latestTimestamp <= toTimestamp) { + queryResults.add(new VersionedRecord<>( + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + latestTimestamp)); + } + } + + // check segment stores + final List<LogicalKeyValueSegment> segments = segmentStores.segments(Long.MIN_VALUE, + toTimestamp, false); + for (final LogicalKeyValueSegment segment : segments) { + final byte[] rawSegmentValue = segment.get(key); + if (rawSegmentValue != null) { + + if (RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue) < fromTimestamp) { + // this segment contains no data for the queried time range, so earlier segments + // cannot either + break; + } + + // the desired result is contained in this segment + final List<SegmentSearchResult> searchResults = RocksDBVersionedStoreSegmentValueFormatter + .deserialize(rawSegmentValue) + .findAll(fromTimestamp, toTimestamp); + for (final SegmentSearchResult searchResult : searchResults) { + if (searchResult.value() != null && searchResult.validFrom() <= toTimestamp && searchResult.validTo() >= fromTimestamp) { Review Comment: When could `searchResult.value()` be `null`? Is is the tombstone case? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + // Visible for testing + ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); + + if (toTimestamp < observedStreamTime - historyRetention) { + // history retention exceeded. we still check the latest value store in case the + // latest record version satisfies the timestamp bound, in which case it should + // still be returned (i.e., the latest record version per key never expires). + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + if (latestTimestamp <= toTimestamp) { + // latest value satisfies timestamp bound + queryResults.add(new VersionedRecord<>( + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + latestTimestamp + )); + } + } + + // history retention has elapsed and the latest record version (if present) does + // not satisfy the timestamp bound. return null for predictability, even if data + // is still present in segments. + if (queryResults.size() == 0) { + LOG.warn("Returning null for expired get."); + } + if (!isAscending) { + queryResults.sort((r1, r2) -> (int) (r1.timestamp() - r2.timestamp())); + } + return new VersionedRecordIterator<>(queryResults); + } else { + // first check the latest value store + final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); + if (rawLatestValueAndTimestamp != null) { + final long latestTimestamp = LatestValueFormatter.getTimestamp( + rawLatestValueAndTimestamp); + if (latestTimestamp <= toTimestamp) { + queryResults.add(new VersionedRecord<>( + LatestValueFormatter.getValue(rawLatestValueAndTimestamp), + latestTimestamp)); + } + } + + // check segment stores + final List<LogicalKeyValueSegment> segments = segmentStores.segments(Long.MIN_VALUE, Review Comment: Remind me: why can't we pass in `fromTimestamp`? (might be worth to add a comment to explain it). ########## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ########## @@ -335,6 +345,28 @@ private static <R> QueryResult<R> runWindowRangeQuery(final Query<R> query, } } + @SuppressWarnings("unchecked") + private static <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config, + final StateStore store) { + if (store instanceof VersionedKeyValueStore) { Review Comment: I think we don't need this check? ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ Review Comment: Should we talk about default ordering in the returned iterator? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ########## @@ -351,6 +383,21 @@ public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V return byteArray -> deserializer.deserialize(serdes.topic(), byteArray); } + public static <V> ValueIterator<VersionedRecord<V>> deserializeValueIterator(final StateSerdes<?, V> serdes, + final ValueIterator<VersionedRecord<byte[]>> rawValueIterator) { Review Comment: nit: formatting / indention ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) { return null; } + // Visible for testing + ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + + final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>(); Review Comment: Not sure if we should materialize the result in-memory -- it there is a long history, it might put a lot of memory pressure on the JVM. -- Instead, we could return an Iterator of "segment iterators" (only for segments that contain data, or might contain data) and to the iteration "lazy / on-demand" instead. It's an somewhat open question for a single-key query, but we don't want to materialize the result for range-key queries for sure... (Cf. range queries over windowed stores for which we do the same -- at least high level -- given how the segment store work, we might not want to use an actual RocksDBIterator -- cf my top level review comment) ########## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. + */ +@Evolving +public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { + + private final K key; + private final Optional<Instant> fromTime; + private final Optional<Instant> toTime; + private final boolean isAscending; + + private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isAscending) { + this.key = Objects.requireNonNull(key); + this.fromTime = fromTime; + this.toTime = toTime; + this.isAscending = isAscending; + } + + /** + * Creates a query that will retrieve the set of records identified by {@code key} if any exists + * (or {@code null} otherwise). Review Comment: Should we talk about default ordering? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedRecordIterator.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.kafka.streams.state.ValueIterator; +import org.apache.kafka.streams.state.VersionedRecord; + +public class VersionedRecordIterator<V> implements ValueIterator<VersionedRecord<V>> { + + protected final List<VersionedRecord<V>> records; + protected int currentIndex; + + public VersionedRecordIterator(final List<VersionedRecord<V>> records) { + this.records = records; + this.currentIndex = 0; + } + + @Override + public void close() { + + } + + @Override + public VersionedRecord<V> peek() { + return next(); Review Comment: peek() should not advance the iterator, and thus cannot call next(). Can we re-use `AbstractIterator` and only implement `makeNext()` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org