This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch iqv2-framework in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 3e308374fc1289c69e28acc0a8a96fa35c3d56fd Author: John Roesler <[email protected]> AuthorDate: Thu Oct 14 12:53:11 2021 -0500 POC of the KIP-796 IQv2 proposal fix test Prototype of using KeyQuery against the Windowed store updates --- .../org/apache/kafka/streams/KafkaStreams.java | 125 +++++- .../apache/kafka/streams/processor/StateStore.java | 25 ++ .../kafka/streams/processor/StateStoreContext.java | 4 +- .../apache/kafka/streams/query/FailureReason.java | 44 ++ .../streams/query/InteractiveQueryRequest.java | 157 +++++++ .../streams/query/InteractiveQueryResult.java | 72 ++++ .../org/apache/kafka/streams/query/Iterators.java | 82 ++++ .../KeyQuery.java} | 26 +- .../org/apache/kafka/streams/query/Position.java | 192 +++++++++ .../apache/kafka/streams/query/PositionBound.java | 80 ++++ .../Query.java} | 16 +- .../apache/kafka/streams/query/QueryResult.java | 150 +++++++ .../RawKeyQuery.java} | 32 +- .../RawScanQuery.java} | 21 +- .../apache/kafka/streams/state/StateSerdes.java | 9 + .../AbstractRocksDBSegmentedBytesStore.java | 18 + .../state/internals/InMemoryKeyValueStore.java | 12 + .../InMemoryTimeOrderedKeyValueBuffer.java | 12 + .../streams/state/internals/MemoryLRUCache.java | 12 + .../state/internals/MeteredKeyValueStore.java | 41 ++ .../state/internals/MeteredSessionStore.java | 4 + .../state/internals/MeteredWindowStore.java | 117 ++++-- .../state/internals/QueryableStoreProvider.java | 5 + .../streams/state/internals/RocksDBStore.java | 42 ++ .../streams/state/internals/StoreQueryUtils.java | 122 ++++++ .../internals/TimestampedKeyValueStoreBuilder.java | 15 + .../internals/TimestampedWindowStoreBuilder.java | 15 + .../state/internals/ValueAndTimestampSerde.java | 10 + .../WindowToTimestampedWindowByteStoreAdapter.java | 12 + .../streams/state/internals/WrappedStateStore.java | 18 + .../streams/integration/IQv2IntegrationTest.java | 460 +++++++++++++++++++++ .../streams/state/internals/RocksDBStoreTest.java | 3 + 32 files changed, 1867 insertions(+), 86 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index bb0c40a..2356096 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetricsContext; @@ -40,29 +41,38 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.errors.UnknownStateStoreException; -import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; import org.apache.kafka.streams.internals.metrics.ClientMetrics; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; -import org.apache.kafka.streams.processor.internals.TopologyMetadata; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator; +import org.apache.kafka.streams.processor.internals.TopologyMetadata; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.InteractiveQueryRequest; +import org.apache.kafka.streams.query.InteractiveQueryResult; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider; +import org.apache.kafka.streams.state.internals.MeteredKeyValueStore; +import org.apache.kafka.streams.state.internals.MeteredSessionStore; +import org.apache.kafka.streams.state.internals.MeteredWindowStore; import org.apache.kafka.streams.state.internals.QueryableStoreProvider; import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider; import org.slf4j.Logger; @@ -78,17 +88,18 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -1716,4 +1727,112 @@ public class KafkaStreams implements AutoCloseable { return Collections.unmodifiableMap(localStorePartitionLags); } + + public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) { + if (!topologyMetadata.hasStore(storeName)) { + throw new UnknownStateStoreException( + "Cannot get state store " + storeName + " because no such store is registered in the topology." + ); + } + + // TODO this is a hack. We ought to be able to create the serdes independent of the + // TODO stores and cache them in the topology. + final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores(); + if (globalStateStores.containsKey(storeName)) { + final StateStore store = globalStateStores.get(storeName); + return getSerdes(store); + } else { + for (final StreamThread thread : threads) { + final Map<TaskId, Task> tasks = thread.allTasks(); + for (final Entry<TaskId, Task> entry : tasks.entrySet()) { + final StateStore store = entry.getValue().getStore(storeName); + if (store != null) { + return getSerdes(store); + } + } + } + } + // there may be no local copy of this store. + // This is the main reason I want to decouble serde + // creation from the store itself. + return null; + } + + @SuppressWarnings("unchecked") + private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) { + if (store instanceof MeteredKeyValueStore) { + return ((MeteredKeyValueStore<K, V>) store).serdes(); + } else if (store instanceof MeteredSessionStore) { + return ((MeteredSessionStore<K, V>) store).serdes(); + } else if (store instanceof MeteredWindowStore) { + return ((MeteredWindowStore<K, V>) store).serdes(); + } else { + throw new IllegalArgumentException("Unknown store type: " + store); + } + } + + @Evolving + public <R> InteractiveQueryResult<R> query(final InteractiveQueryRequest<R> request) { + final String storeName = request.getStoreName(); + if (!topologyMetadata.hasStore(storeName)) { + throw new UnknownStateStoreException( + "Cannot get state store " + storeName + " because no such store is registered in the topology." + ); + } + final InteractiveQueryResult<R> result = new InteractiveQueryResult<>(new HashMap<>()); + + final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores(); + if (globalStateStores.containsKey(storeName)) { + final StateStore store = globalStateStores.get(storeName); + final QueryResult<R> r = + store.query( + request.getQuery(), + request.getPositionBound(), + request.executionInfoEnabled() + ); + result.setGlobalResult(r); + } else { + for (final StreamThread thread : threads) { + final Map<TaskId, Task> tasks = thread.allTasks(); + for (final Entry<TaskId, Task> entry : tasks.entrySet()) { + + final TaskId taskId = entry.getKey(); + final int partition = taskId.partition(); + if (request.isAllPartitions() + || request.getPartitions().contains(partition)) { + final Task task = entry.getValue(); + final StateStore store = task.getStore(storeName); + if (store != null) { + final StreamThread.State state = thread.state(); + final boolean active = task.isActive(); + if (request.isRequireActive() + && (state != StreamThread.State.RUNNING + || !active)) { + result.addResult( + partition, + QueryResult.notActive( + state, + active, + partition + ) + ); + } else { + final QueryResult<R> r = store.query( + request.getQuery(), + request.isRequireActive() + ? PositionBound.unbounded() + : request.getPositionBound(), + request.executionInfoEnabled() + ); + result.addResult(partition, r); + } + } + } + } + } + } + + return result; + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 76d1ab4..b1c480f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -19,6 +19,10 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; /** * A storage engine for managing state maintained by a stream processor. @@ -119,4 +123,25 @@ public interface StateStore { * @return {@code true} if the store is open */ boolean isOpen(); + + /** + * Execute a query. Returns a QueryResult containing either result data or + * a failure. + * <p> + * If the store doesn't know how to handle the given query, the result + * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}. + * If the store couldn't satisfy the given position bound, the result + * will be a {@link FailureReason#NOT_UP_TO_BOUND}. + * @param query The query to execute + * @param positionBound The position the store must be at or past + * @param collectExecutionInfo Whether the store should collect detailed execution info for the query + * @param <R> The result type + */ + default <R> QueryResult<R> query( + Query<R> query, + PositionBound positionBound, + boolean collectExecutionInfo) { + + return QueryResult.forUnknownQueryType(query, this); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java index 50d5879..f6f1446 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java @@ -46,8 +46,8 @@ public interface StateStoreContext { /** * Return the metadata of the current topic/partition/offset if available. - * This is defined as the metadata of the record that is currently been - * processed by the StreamTask that holds the store. + * This is defined as the metadata of the record that is currently being + * processed (or was last processed) by the StreamTask that holds the store. * <p> * Note that the metadata is not defined during all store interactions, for * example, while the StreamTask is running a punctuation. diff --git a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java new file mode 100644 index 0000000..02db7fe --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + + +public enum FailureReason { + /** + * Failure indicating that the store doesn't know how to handle the given query. + */ + UNKNOWN_QUERY_TYPE, + + /** + * Failure indicating that the store partition is not (yet) up to the desired bound. + * The caller should either try again later or try a different replica. + */ + NOT_UP_TO_BOUND, + + /** + * Failure indicating that the requested store partition is not present on the local + * KafkaStreams instance. It may have been migrated to another instance during a rebalance. + * The caller is recommended to try a different replica. + */ + NOT_PRESENT, + + /** + * The requested store partition does not exist at all. For example, partition 4 was requested, + * but the store in question only has 4 partitions (0 through 3). + */ + DOES_NOT_EXIST; +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java new file mode 100644 index 0000000..56fc0a8 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * @param <R> + */ +public class InteractiveQueryRequest<R> { + + private final String storeName; + private final PositionBound position; + private final Optional<Set<Integer>> partitions; + private final Query<R> query; + private boolean executionInfoEnabled; + private boolean requireActive; + + private InteractiveQueryRequest( + final String storeName, + final PositionBound position, + final Optional<Set<Integer>> partitions, + final Query<R> query, + final boolean executionInfoEnabled, final boolean requireActive) { + + this.storeName = storeName; + this.position = position; + this.partitions = partitions; + this.query = query; + this.executionInfoEnabled = executionInfoEnabled; + this.requireActive = requireActive; + } + + public static InStore inStore(final String name) { + return new InStore(name); + } + + public InteractiveQueryRequest<R> withPositionBound(final PositionBound positionBound) { + return new InteractiveQueryRequest<>( + storeName, + positionBound, + partitions, + query, + executionInfoEnabled, + false); + } + + + public InteractiveQueryRequest<R> withNoPartitions() { + return new InteractiveQueryRequest<>(storeName, + position, + Optional.of(Collections.emptySet()), + query, + executionInfoEnabled, + requireActive); + } + + public InteractiveQueryRequest<R> withAllPartitions() { + return new InteractiveQueryRequest<>(storeName, + position, + Optional.empty(), + query, + executionInfoEnabled, + requireActive); + } + + public InteractiveQueryRequest<R> withPartitions(final Set<Integer> partitions) { + return new InteractiveQueryRequest<>(storeName, + position, + Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))), + query, + executionInfoEnabled, + requireActive); + } + + public String getStoreName() { + return storeName; + } + + public PositionBound getPositionBound() { + if (requireActive) { + throw new IllegalArgumentException(); + } + return Objects.requireNonNull(position); + } + + public Query<R> getQuery() { + return query; + } + + public boolean isAllPartitions() { + return !partitions.isPresent(); + } + + public Set<Integer> getPartitions() { + if (!partitions.isPresent()) { + throw new UnsupportedOperationException( + "Cannot list partitions of an 'all partitions' request"); + } else { + return partitions.get(); + } + } + + public InteractiveQueryRequest<R> enableExecutionInfo() { + return new InteractiveQueryRequest<>(storeName, + position, + partitions, + query, + true, + requireActive); + } + + public boolean executionInfoEnabled() { + return executionInfoEnabled; + } + + public boolean isRequireActive() { + return requireActive; + } + + public static class InStore { + + private final String name; + + private InStore(final String name) { + this.name = name; + } + + public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query) { + return new InteractiveQueryRequest<>( + name, + null, + Optional.empty(), + query, + false, + true); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java new file mode 100644 index 0000000..4f8e728 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class InteractiveQueryResult<R> { + + private final Map<Integer, QueryResult<R>> partitionResults; + + public InteractiveQueryResult(final Map<Integer, QueryResult<R>> resultMap) { + partitionResults = resultMap; + } + + public void setGlobalResult(final QueryResult<R> r) { + + } + + public void addResult(final int partition, final QueryResult<R> r) { + partitionResults.put(partition, r); + } + + public Map<Integer, QueryResult<R>> getPartitionResults() { + return partitionResults; + } + + public QueryResult<R> getOnlyPartitionResult() { + final List<QueryResult<R>> nonempty = + partitionResults + .values() + .stream() + .filter(r -> r.getResult() != null) + .collect(Collectors.toList()); + + if (nonempty.size() != 1) { + throw new IllegalStateException(); + } else { + return nonempty.get(0); + } + } + + public Position getPosition() { + Position position = Position.emptyPosition(); + for (final QueryResult<R> r : partitionResults.values()) { + position = position.merge(r.getPosition()); + } + return position; + } + + @Override + public String toString() { + return "InteractiveQueryResult{" + + "partitionResults=" + partitionResults + + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java b/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java new file mode 100644 index 0000000..d7ea7e9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/Iterators.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.utils.CloseableIterator; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Objects; + +public final class Iterators { + + private Iterators() { + } + + public static <E, I extends Iterator<E>> CloseableIterator<E> collate(final Collection<I> iterators) { + return new CloseableIterator<E>() { + private final Deque<I> iteratorQueue = new LinkedList<>(iterators); + + @Override + public void close() { + RuntimeException exception = null; + for (final I iterator : iterators) { + if (iterator instanceof Closeable) { + try { + ((Closeable) iterator).close(); + } catch (final IOException e) { + if (exception == null) { + exception = new RuntimeException( + "Exception closing collated iterator", e); + } else { + exception.addSuppressed(e); + } + } + } + } + if (exception != null) { + throw exception; + } + } + + @Override + public boolean hasNext() { + for (int i = 0; i < iterators.size(); i++) { + final Iterator<E> iterator = Objects.requireNonNull(iteratorQueue.peek()); + if (iterator.hasNext()) { + return true; + } else { + iteratorQueue.push(iteratorQueue.poll()); + } + } + return false; + } + + @Override + public E next() { + final I iterator = iteratorQueue.poll(); + final E next = Objects.requireNonNull(iterator).next(); + iteratorQueue.push(iterator); + return next; + } + }; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java similarity index 56% copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java copy to streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java index 1936d29..3900739 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java @@ -14,19 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.state.internals; +package org.apache.kafka.streams.query; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde; -import org.apache.kafka.streams.state.ValueAndTimestamp; +public class KeyQuery<K, V> implements Query<V> { -import static java.util.Objects.requireNonNull; + private final K key; -public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> { - public ValueAndTimestampSerde(final Serde<V> valueSerde) { - super( - new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()), - new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer()) - ); + private KeyQuery(final K key) { + this.key = key; } -} \ No newline at end of file + + public static <K, V> KeyQuery<K, V> withKey(final K key) { + return new KeyQuery<>(key); + } + + public K getKey() { + return key; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java new file mode 100644 index 0000000..5b0e981 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; + +public class Position { + + private final Map<String, Map<Integer, Long>> position; + + private Position(final Map<String, Map<Integer, Long>> position) { + this.position = position; + } + + public static Position emptyPosition() { + return new Position(new HashMap<>()); + } + + public static Position fromMap(final Map<String, Map<Integer, Long>> map) { + return new Position(deepCopy(map)); + } + + public Position withComponent(final String topic, final int partition, final long offset) { + final Map<String, Map<Integer, Long>> updated = deepCopy(position); + updated.computeIfAbsent(topic, k -> new HashMap<>()).put(partition, offset); + return new Position(updated); + } + + public Position merge(final Position other) { + if (other == null) { + return this; + } else { + final Map<String, Map<Integer, Long>> copy = deepCopy(position); + for (final Entry<String, Map<Integer, Long>> entry : other.position.entrySet()) { + final String topic = entry.getKey(); + final Map<Integer, Long> partitionMap = + copy.computeIfAbsent(topic, k -> new HashMap<>()); + for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) { + final Integer partition = partitionOffset.getKey(); + final Long offset = partitionOffset.getValue(); + if (!partitionMap.containsKey(partition) + || partitionMap.get(partition) < offset) { + partitionMap.put(partition, offset); + } + } + } + return new Position(copy); + } + } + + public Set<String> getTopics() { + return Collections.unmodifiableSet(position.keySet()); + } + + public Map<Integer, Long> getBound(final String topic) { + return Collections.unmodifiableMap(position.get(topic)); + } + + public ByteBuffer serialize() { + final byte version = (byte) 0; + + int arraySize = Byte.SIZE; // version + + final int nTopics = position.size(); + arraySize += Integer.SIZE; + + final ArrayList<Entry<String, Map<Integer, Long>>> entries = + new ArrayList<>(position.entrySet()); + final byte[][] topics = new byte[entries.size()][]; + + for (int i = 0; i < nTopics; i++) { + final Entry<String, Map<Integer, Long>> entry = entries.get(i); + final byte[] topicBytes = entry.getKey().getBytes(StandardCharsets.UTF_8); + topics[i] = topicBytes; + arraySize += Integer.SIZE; // topic name length + arraySize += topicBytes.length; // topic name itself + + final Map<Integer, Long> partitionOffsets = entry.getValue(); + arraySize += Integer.SIZE; // Number of PartitionOffset pairs + arraySize += (Integer.SIZE + Long.SIZE) + * partitionOffsets.size(); // partitionOffsets themselves + } + + final ByteBuffer buffer = ByteBuffer.allocate(arraySize); + buffer.put(version); + + buffer.putInt(nTopics); + for (int i = 0; i < nTopics; i++) { + buffer.putInt(topics[i].length); + buffer.put(topics[i]); + + final Entry<String, Map<Integer, Long>> entry = entries.get(i); + final Map<Integer, Long> partitionOffsets = entry.getValue(); + buffer.putInt(partitionOffsets.size()); + for (final Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) { + buffer.putInt(partitionOffset.getKey()); + buffer.putLong(partitionOffset.getValue()); + } + } + + buffer.flip(); + return buffer; + } + + public static Position deserialize(final ByteBuffer buffer) { + final byte version = buffer.get(); + + switch (version) { + case (byte) 0: + final int nTopics = buffer.getInt(); + final Map<String, Map<Integer, Long>> position = new HashMap<>(nTopics); + for (int i = 0; i < nTopics; i++) { + final int topicNameLength = buffer.getInt(); + final byte[] topicNameBytes = new byte[topicNameLength]; + buffer.get(topicNameBytes); + final String topic = new String(topicNameBytes, StandardCharsets.UTF_8); + + final int numPairs = buffer.getInt(); + final Map<Integer, Long> partitionOffsets = new HashMap<>(numPairs); + for (int j = 0; j < numPairs; j++) { + partitionOffsets.put(buffer.getInt(), buffer.getLong()); + } + position.put(topic, partitionOffsets); + } + return Position.fromMap(position); + default: + throw new IllegalArgumentException( + "Unknown version " + version + " when deserializing Position" + ); + } + } + + private static Map<String, Map<Integer, Long>> deepCopy( + final Map<String, Map<Integer, Long>> map) { + if (map == null) { + return new HashMap<>(); + } else { + final Map<String, Map<Integer, Long>> copy = new HashMap<>(map.size()); + for (final Entry<String, Map<Integer, Long>> entry : map.entrySet()) { + copy.put(entry.getKey(), new HashMap<>(entry.getValue())); + } + return copy; + } + } + + @Override + public String toString() { + return "Position{" + + "position=" + position + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Position position1 = (Position) o; + return Objects.equals(position, position1.position); + } + + @Override + public int hashCode() { + return Objects.hash(position); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java new file mode 100644 index 0000000..a4dbe35 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + + +import java.util.Objects; + +public class PositionBound { + + private final Position position; + private final boolean unbounded; + + private PositionBound(final Position position, final boolean unbounded) { + if (unbounded && position != null) { + throw new IllegalArgumentException(); + } + this.position = position; + this.unbounded = unbounded; + } + + public static PositionBound unbounded() { + return new PositionBound(null, true); + } + + public static PositionBound at(final Position position) { + return new PositionBound(position, false); + } + + public boolean isUnbounded() { + return unbounded; + } + + public Position position() { + if (unbounded) { + throw new IllegalArgumentException(); + } else { + return position; + } + } + + @Override + public String toString() { + if (isUnbounded()) { + return "PositionBound{unbounded}"; + } else { + return "PositionBound{position=" + position + '}'; + } + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PositionBound that = (PositionBound) o; + return unbounded == that.unbounded && Objects.equals(position, that.position); + } + + @Override + public int hashCode() { + return Objects.hash(position, unbounded); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/Query.java similarity index 53% copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java copy to streams/src/main/java/org/apache/kafka/streams/query/Query.java index 1936d29..988f904 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/Query.java @@ -14,19 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.state.internals; +package org.apache.kafka.streams.query; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde; -import org.apache.kafka.streams.state.ValueAndTimestamp; -import static java.util.Objects.requireNonNull; +public interface Query<R> { -public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> { - public ValueAndTimestampSerde(final Serde<V> valueSerde) { - super( - new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()), - new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer()) - ); - } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java new file mode 100644 index 0000000..f3b92d6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + + +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.StreamThread.State; + +import java.util.LinkedList; +import java.util.List; + +public final class QueryResult<R> { + + private final List<String> executionInfo = new LinkedList<>(); + private final FailureReason failureReason; + private final String failure; + private final R result; + private Position boundUpdate; + + private QueryResult(final R result) { + this.result = result; + this.failureReason = null; + this.failure = null; + } + + private QueryResult(final FailureReason failureReason, final String failure) { + this.result = null; + this.failureReason = failureReason; + this.failure = failure; + } + + public static <R> QueryResult<R> forResult(final R result) { + return new QueryResult<>(result); + } + + public static <R> QueryResult<R> forUnknownQueryType( + final Query<R> query, + final StateStore store) { + + return new QueryResult<>( + FailureReason.UNKNOWN_QUERY_TYPE, + "This store (" + store.getClass() + ") doesn't know how to execute " + + "the given query (" + query + ")." + + " Contact the store maintainer if you need support for a new query type."); + } + + public static <R> QueryResult<R> notUpToBound( + final Position currentPosition, + final PositionBound positionBound, + final int partition) { + + return new QueryResult<>( + FailureReason.NOT_UP_TO_BOUND, + "For store partition " + partition + ", the current position " + + currentPosition + " is not yet up to the bound " + + positionBound + ); + } + + public static <R> QueryResult<R> notActive( + final State state, + final boolean active, + final int partition) { + return new QueryResult<>( + FailureReason.NOT_UP_TO_BOUND, + "Query requires a running active task," + + " but partition " + partition + " was in state " + state + " and was " + + (active ? "active" : "not active") + "." + ); + } + + + public <NewR> QueryResult<NewR> swapResult(final NewR typedResult) { + final QueryResult<NewR> queryResult = new QueryResult<>(typedResult); + queryResult.executionInfo.addAll(executionInfo); + queryResult.boundUpdate = boundUpdate; + return queryResult; + } + + public void addExecutionInfo(final String s) { + executionInfo.add(s); + } + + public void throwIfFailure() { + if (isFailure()) { + throw new RuntimeException(failureReason.name() + ": " + failure); + } + } + + public boolean isSuccess() { + return failureReason == null; + } + + public boolean isFailure() { + return failureReason != null; + } + + public List<String> getExecutionInfo() { + return executionInfo; + } + + public FailureReason getFailureReason() { + return failureReason; + } + + public String getFailure() { + return failure; + } + + public R getResult() { + if (result == null) { + throwIfFailure(); + } + // will return `null` if there's not a failure recorded. + return result; + } + + public void setPosition(final Position boundUpdate) { + this.boundUpdate = boundUpdate; + } + + public Position getPosition() { + return boundUpdate; + } + + @Override + public String toString() { + return "QueryResult{" + + "executionInfo=" + executionInfo + + ", failureReason=" + failureReason + + ", failure='" + failure + '\'' + + ", result=" + result + + ", boundUpdate=" + boundUpdate + + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java similarity index 53% copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java copy to streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java index 1936d29..c42762e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java @@ -14,19 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.state.internals; +package org.apache.kafka.streams.query; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde; -import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.common.utils.Bytes; -import static java.util.Objects.requireNonNull; +public class RawKeyQuery implements Query<byte[]> { -public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> { - public ValueAndTimestampSerde(final Serde<V> valueSerde) { - super( - new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()), - new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer()) - ); + private final Bytes key; + + private RawKeyQuery(final Bytes key) { + this.key = key; + } + + public static RawKeyQuery withKey(final Bytes key) { + return new RawKeyQuery(key); + } + + public static RawKeyQuery withKey(final byte[] key) { + return new RawKeyQuery(Bytes.wrap(key)); + } + + public Bytes getKey() { + return key; } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java similarity index 53% copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java copy to streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java index 1936d29..d1f14f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/RawScanQuery.java @@ -14,19 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.state.internals; +package org.apache.kafka.streams.query; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde; -import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueIterator; -import static java.util.Objects.requireNonNull; +public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> { -public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> { - public ValueAndTimestampSerde(final Serde<V> valueSerde) { - super( - new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()), - new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer()) - ); + private RawScanQuery() {} + + public static RawScanQuery scan() { + return new RawScanQuery(); } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java index f9f0bdc..da7927e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java @@ -212,4 +212,13 @@ public final class StateSerdes<K, V> { e); } } + + @Override + public String toString() { + return "StateSerdes{" + + "topic='" + topic + '\'' + + ", keySerde=" + keySerde + + ", valueSerde=" + valueSerde + + '}'; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index bfee6b2..c1f7461 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -27,6 +27,10 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RawKeyQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.rocksdb.RocksDBException; import org.rocksdb.WriteBatch; @@ -279,6 +283,20 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se return open; } + @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + if (query instanceof RawKeyQuery) { + final Bytes key = ((RawKeyQuery) query).getKey(); + final byte[] bytes = get(key); + return QueryResult.forResult((R) bytes); + } else { + return QueryResult.forUnknownQueryType(query, this); + } + } + // Visible for testing List<S> getSegments() { return segments.allSegments(false); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index f0c6dbe..f9bc34e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -21,6 +21,9 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.slf4j.Logger; @@ -73,6 +76,15 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { } @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + return StoreQueryUtils.requireKVQuery(query, this, collectExecutionInfo); + } + + @Override public synchronized byte[] get(final Bytes key) { return map.get(key); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index ba8a745..e13b357 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -40,6 +40,9 @@ import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordQueue; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult; @@ -240,6 +243,15 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere } @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + return QueryResult.forUnknownQueryType(query, this); + } + + @Override public void close() { open = false; index.clear(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index 22f1215..1bea349 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -22,6 +22,9 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -108,6 +111,15 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> { } @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + return StoreQueryUtils.requireKVQuery(query, this, collectExecutionInfo); + } + + @Override public synchronized byte[] get(final Bytes key) { Objects.requireNonNull(key); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 937288c..21b8e38 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -34,6 +34,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RawKeyQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -186,6 +191,38 @@ public class MeteredKeyValueStore<K, V> return false; } + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = System.nanoTime(); + final QueryResult<R> result; + + if (query instanceof KeyQuery) { + final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query; + final RawKeyQuery rawKeyQuery = RawKeyQuery.withKey(keyBytes(typedQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); + if (rawResult.isSuccess()) { + final V value = outerValue(rawResult.getResult()); + final QueryResult<V> typedQueryResult = + rawResult.swapResult(value); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + } else { + result = wrapped().query(query, positionBound, collectExecutionInfo); + } + final long end = System.nanoTime(); + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + serdes + " in " + (end - start) + "ns"); + return result; + } + @Override public V get(final K key) { Objects.requireNonNull(key, "key cannot be null"); @@ -324,6 +361,10 @@ public class MeteredKeyValueStore<K, V> } } + public StateSerdes<K, V> serdes() { + return serdes; + } + private class MeteredKeyValueIterator implements KeyValueIterator<K, V> { private final KeyValueIterator<Bytes, byte[]> iter; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index b1eb948..87b3b40 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -372,4 +372,8 @@ public class MeteredSessionStore<K, V> e2eLatencySensor.record(e2eLatency, currentTime); } } + + public StateSerdes<K, V> serdes() { + return serdes; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 0970703..1f267bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -33,6 +33,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RawKeyQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; @@ -63,11 +68,11 @@ public class MeteredWindowStore<K, V> private TaskId taskId; MeteredWindowStore(final WindowStore<Bytes, byte[]> inner, - final long windowSizeMs, - final String metricsScope, - final Time time, - final Serde<K> keySerde, - final Serde<V> valueSerde) { + final long windowSizeMs, + final String metricsScope, + final Time time, + final Serde<K> keySerde, + final Serde<V> valueSerde) { super(inner); this.windowSizeMs = windowSizeMs; this.metricsScope = metricsScope; @@ -87,7 +92,8 @@ public class MeteredWindowStore<K, V> registerMetrics(); final Sensor restoreSensor = - StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); + StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), + streamsMetrics); // register and possibly restore the state from the logs maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); @@ -103,20 +109,26 @@ public class MeteredWindowStore<K, V> registerMetrics(); final Sensor restoreSensor = - StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); + StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), + streamsMetrics); // register and possibly restore the state from the logs maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); } + protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final SerdeGetter getter) { return WrappingNullableUtils.prepareValueSerde(valueSerde, getter); } private void registerMetrics() { - putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics); - fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics); - flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics); - e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); + putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), + streamsMetrics); + fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), + streamsMetrics); + flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), + streamsMetrics); + e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, + name(), streamsMetrics); } @Deprecated @@ -126,7 +138,8 @@ public class MeteredWindowStore<K, V> serdes = new StateSerdes<>( changelogTopic != null ? changelogTopic : - ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()), + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, + taskId.topologyName()), prepareKeySerde(keySerde, new SerdeGetter(context)), prepareValueSerde(valueSerde, new SerdeGetter(context))); } @@ -137,7 +150,8 @@ public class MeteredWindowStore<K, V> serdes = new StateSerdes<>( changelogTopic != null ? changelogTopic : - ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, taskId.topologyName()), + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, + taskId.topologyName()), prepareKeySerde(keySerde, new SerdeGetter(context)), prepareValueSerde(valueSerde, new SerdeGetter(context))); } @@ -145,7 +159,7 @@ public class MeteredWindowStore<K, V> @SuppressWarnings("unchecked") @Override public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener, - final boolean sendOldValues) { + final boolean sendOldValues) { final WindowStore<Bytes, byte[]> wrapped = wrapped(); if (wrapped instanceof CachedStateStore) { return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener( @@ -161,10 +175,43 @@ public class MeteredWindowStore<K, V> return false; } + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo + ) { + if (query instanceof KeyQuery) { + final Windowed<K> key = ((KeyQuery<Windowed<K>, V>) query).getKey(); + final Bytes bytes = keyBytes(key.key()); + // NOTE: we need to _fully_ serialize the key, since we can't pass in any + // extra timestamp information in the RawKeyQuery. So, we go ahead and use the + // internal store's binary schema. This works for the provided KS windowed stores, + // but a custom store that uses a different schema will have to use the WindowKeySchema + // to read the data back out of the array and convert it to whatever the real binary + // key is. + // seqnum hard-coded to zero since we don't query stream-stream join stores. + final Bytes storeKey = WindowKeySchema.toStoreKeyBinary( + bytes, + key.window().start(), + 0 + ); + final RawKeyQuery rawKeyQuery = RawKeyQuery.withKey(storeKey); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); + final V v = serdes.valueFrom(rawResult.getResult()); + final QueryResult<V> result = rawResult.swapResult(v); + return (QueryResult<R>) result; + } else { + return super.query(query, positionBound, collectExecutionInfo); + } + } + @Override public void put(final K key, - final V value, - final long windowStartTimestamp) { + final V value, + final long windowStartTimestamp) { Objects.requireNonNull(key, "key cannot be null"); try { maybeMeasureLatency( @@ -181,7 +228,7 @@ public class MeteredWindowStore<K, V> @Override public V fetch(final K key, - final long timestamp) { + final long timestamp) { Objects.requireNonNull(key, "key cannot be null"); return maybeMeasureLatency( () -> { @@ -198,8 +245,8 @@ public class MeteredWindowStore<K, V> @Override public WindowStoreIterator<V> fetch(final K key, - final long timeFrom, - final long timeTo) { + final long timeFrom, + final long timeTo) { Objects.requireNonNull(key, "key cannot be null"); return new MeteredWindowStoreIterator<>( wrapped().fetch(keyBytes(key), timeFrom, timeTo), @@ -212,8 +259,8 @@ public class MeteredWindowStore<K, V> @Override public WindowStoreIterator<V> backwardFetch(final K key, - final long timeFrom, - final long timeTo) { + final long timeFrom, + final long timeTo) { Objects.requireNonNull(key, "key cannot be null"); return new MeteredWindowStoreIterator<>( wrapped().backwardFetch(keyBytes(key), timeFrom, timeTo), @@ -226,9 +273,9 @@ public class MeteredWindowStore<K, V> @Override public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom, - final K keyTo, - final long timeFrom, - final long timeTo) { + final K keyTo, + final long timeFrom, + final long timeTo) { return new MeteredWindowedKeyValueIterator<>( wrapped().fetch( keyBytes(keyFrom), @@ -243,9 +290,9 @@ public class MeteredWindowStore<K, V> @Override public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom, - final K keyTo, - final long timeFrom, - final long timeTo) { + final K keyTo, + final long timeFrom, + final long timeTo) { return new MeteredWindowedKeyValueIterator<>( wrapped().backwardFetch( keyBytes(keyFrom), @@ -260,7 +307,7 @@ public class MeteredWindowStore<K, V> @Override public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, - final long timeTo) { + final long timeTo) { return new MeteredWindowedKeyValueIterator<>( wrapped().fetchAll(timeFrom, timeTo), fetchSensor, @@ -271,7 +318,7 @@ public class MeteredWindowStore<K, V> @Override public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long timeFrom, - final long timeTo) { + final long timeTo) { return new MeteredWindowedKeyValueIterator<>( wrapped().backwardFetchAll(timeFrom, timeTo), fetchSensor, @@ -282,12 +329,14 @@ public class MeteredWindowStore<K, V> @Override public KeyValueIterator<Windowed<K>, V> all() { - return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics, serdes, time); + return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics, + serdes, time); } @Override public KeyValueIterator<Windowed<K>, V> backwardAll() { - return new MeteredWindowedKeyValueIterator<>(wrapped().backwardAll(), fetchSensor, streamsMetrics, serdes, time); + return new MeteredWindowedKeyValueIterator<>(wrapped().backwardAll(), fetchSensor, + streamsMetrics, serdes, time); } @Override @@ -313,8 +362,12 @@ public class MeteredWindowStore<K, V> // In that case, we _can't_ get the current timestamp, so we don't record anything. if (e2eLatencySensor.shouldRecord() && context != null) { final long currentTime = time.milliseconds(); - final long e2eLatency = currentTime - context.timestamp(); + final long e2eLatency = currentTime - context.timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); } } + + public StateSerdes<K, V> serdes() { + return serdes; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java index 07cf0ee..aedb67c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.QueryableStoreType; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,4 +72,8 @@ public class QueryableStoreProvider { public void removeStoreProviderForThread(final String threadName) { this.storeProviders.remove(threadName); } + + public Collection<StreamThreadStateStoreProvider> getStoreProviders() { + return storeProviders.values(); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index aa1b1ba..e9ea595 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -28,6 +28,11 @@ import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.RocksDBConfigSetter; @@ -61,6 +66,7 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -105,6 +111,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS private final RocksDBMetricsRecorder metricsRecorder; protected volatile boolean open = false; + private StateStoreContext context; + private Map<String, Map<Integer, Long>> seenOffsets = new HashMap<>(); RocksDBStore(final String name, final String metricsScope) { @@ -252,6 +260,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS // value getter should always read directly from rocksDB // since it is only for values that are already flushed context.register(root, new RocksDBBatchingRestoreCallback(this)); + this.context = context; } @Override @@ -269,6 +278,29 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS return open; } + @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + if (context == null) { + throw new IllegalStateException("Store is not yet initialized"); + } else { + final int partition = this.context.taskId().partition(); + if (StoreQueryUtils.isPermitted(seenOffsets, positionBound, partition)) { + final QueryResult<R> result = StoreQueryUtils.requireKVQuery(query, this, + collectExecutionInfo); + final Position currentPosition = Position.fromMap(seenOffsets); + result.setPosition(currentPosition); + return result; + } else { + return QueryResult.notUpToBound(Position.fromMap(seenOffsets), positionBound, + partition); + } + } + } + private void validateStoreOpen() { if (!open) { throw new InvalidStateStoreException("Store " + name + " is currently closed"); @@ -281,6 +313,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); dbAccessor.put(key.get(), value); + // FIXME record metadata can be null because when this store is used as a Segment, + // we never call init(). Is that correct? + // to make this logic work properly for segmented stores, we either need to + // track the seen offsets one level up (in the RocksDBSegmentedBytesStore) OR + // we need to get a reference to the context here. + if (context != null && context.recordMetadata().isPresent()) { + final RecordMetadata meta = context.recordMetadata().get(); + seenOffsets.computeIfAbsent(meta.topic(), t -> new HashMap<>()) + .put(meta.partition(), meta.offset()); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java new file mode 100644 index 0000000..603257c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RawKeyQuery; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.util.Map; + +public final class StoreQueryUtils { + + // make this class uninstantiable + private StoreQueryUtils() { + } + + public static <R> QueryResult<R> requireKVQuery( + final Query<R> query, + final KeyValueStore<Bytes, byte[]> kvStore, + final boolean enableExecutionInfo) { + final QueryResult<R> r = StoreQueryUtils.handleKVQuery(query, kvStore, enableExecutionInfo); + r.throwIfFailure(); + return r; + } + + public static <R> QueryResult<R> handleKVQuery( + final Query<R> query, + final KeyValueStore<Bytes, byte[]> kvStore, + final boolean enableExecutionInfo) { + + final long start = System.nanoTime(); + final String name = query.getClass().getCanonicalName(); + switch (name) { + case "org.apache.kafka.streams.query.RawKeyQuery": { + final RawKeyQuery keyQuery = (RawKeyQuery) query; + return handleRawKeyQuery(kvStore, enableExecutionInfo, start, keyQuery); + } + case "org.apache.kafka.streams.query.RawScanQuery": { + final KeyValueIterator<Bytes, byte[]> iterator = kvStore.all(); + @SuppressWarnings("unchecked") final R result = (R) iterator; + final long end = System.nanoTime(); + final QueryResult<R> queryResult = QueryResult.forResult(result); + if (enableExecutionInfo) { + queryResult.addExecutionInfo("Handled on " + kvStore.getClass().getName() + + "#all via StoreQueryAdapters" + " in " + (end - start) + "ns"); + } + return queryResult; + } + default: + return QueryResult.forUnknownQueryType(query, kvStore); + } + } + + public static <R> QueryResult<R> handleRawKeyQuery( + final KeyValueStore<Bytes, byte[]> kvStore, + final boolean enableExecutionInfo, + final long start, + final RawKeyQuery keyQuery) { + + final Bytes key = keyQuery.getKey(); + final byte[] value = kvStore.get(key); + @SuppressWarnings("unchecked") final R result = (R) value; + final long end = System.nanoTime(); + + final QueryResult<R> queryResult = QueryResult.forResult(result); + if (enableExecutionInfo) { + queryResult.addExecutionInfo("Handled on " + kvStore.getClass().getName() + + "#get via StoreQueryAdapters" + " in " + (end - start) + "ns"); + } + return queryResult; + } + + public static boolean isPermitted( + final Map<String, Map<Integer, Long>> seenOffsets, + final PositionBound positionBound, + final int partition) { + if (positionBound.isUnbounded()) { + return true; + } else { + final Position position = positionBound.position(); + for (final String topic : position.getTopics()) { + final Map<Integer, Long> partitionBounds = position.getBound(topic); + final Map<Integer, Long> seenPartitionBounds = seenOffsets.get(topic); + if (!partitionBounds.containsKey(partition)) { + // this topic isn't bounded for our partition, so just skip over it. + } else { + if (seenPartitionBounds == null) { + // we haven't seen a topic that is bounded for our partition + return false; + } else if (!seenPartitionBounds.containsKey(partition)) { + // we haven't seen a partition that we have a bound for + return false; + } else if (seenPartitionBounds.get(partition) < partitionBounds.get( + partition)) { + // our current position is behind the bound + return false; + } + } + } + return true; + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java index a249a14..f4eb55e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java @@ -24,6 +24,9 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -185,6 +188,18 @@ public class TimestampedKeyValueStoreBuilder<K, V> } @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = System.nanoTime(); + final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo); + final long end = System.nanoTime(); + result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns"); + return result; + } + + @Override public String name() { return wrapped.name(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index b3727f5..9b28ef5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -23,6 +23,9 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.TimestampedWindowStore; @@ -203,6 +206,18 @@ public class TimestampedWindowStoreBuilder<K, V> } @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = System.nanoTime(); + final QueryResult<R> result = wrapped.query(query, positionBound, collectExecutionInfo); + final long end = System.nanoTime(); + result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns"); + return result; + } + + @Override public String name() { return wrapped.name(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java index 1936d29..599b6e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java @@ -23,10 +23,20 @@ import org.apache.kafka.streams.state.ValueAndTimestamp; import static java.util.Objects.requireNonNull; public class ValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> { + private final Serde<V> valueSerde; + public ValueAndTimestampSerde(final Serde<V> valueSerde) { super( new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, "valueSerde was null").serializer()), new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, "valueSerde was null").deserializer()) ); + this.valueSerde = valueSerde; + } + + @Override + public String toString() { + return "ValueAndTimestampSerde{" + + "valueSerde=" + valueSerde + + '}'; } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java index f7999d3..2b626cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java @@ -21,6 +21,9 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -183,6 +186,15 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by return store.isOpen(); } + @Override + public <R> QueryResult<R> query( + final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + return QueryResult.forUnknownQueryType(query, this); + } + private static class WindowToTimestampedWindowIteratorAdapter extends KeyValueToTimestampedKeyValueIteratorAdapter<Long> diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index e8244f7..e904a48 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -20,6 +20,9 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.TimestampedBytesStore; /** @@ -103,6 +106,21 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S wrapped.close(); } + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + final long start = System.nanoTime(); + final QueryResult<R> result = wrapped().query(query, positionBound, collectExecutionInfo); + if (collectExecutionInfo) { + final long end = System.nanoTime(); + result.addExecutionInfo( + "Handled in " + getClass() + " via WrappedStateStore" + " in " + (end - start) + + "ns"); + } + return result; + } + public S wrapped() { return wrapped; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java new file mode 100644 index 0000000..23374f3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.InteractiveQueryRequest; +import org.apache.kafka.streams.query.InteractiveQueryResult; +import org.apache.kafka.streams.query.Iterators; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RawKeyQuery; +import org.apache.kafka.streams.query.RawScanQuery; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.query.InteractiveQueryRequest.inStore; +import static org.apache.kafka.streams.query.PositionBound.at; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThrows; + +@Category({IntegrationTest.class}) +public class IQv2IntegrationTest { + + private static final int NUM_BROKERS = 1; + private static int port = 0; + private static final String INPUT_TOPIC_NAME = "input-topic"; + private static final String INPUT2_TOPIC_NAME = "input2-topic"; + private static final String UNCACHED_TABLE = "uncached-table"; + private static final String UNCACHED_COUNTS_TABLE = "uncached-counts-table"; + private static final String CACHED_TABLE = "cached-table"; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + @Rule + public TestName testName = new TestName(); + private static KafkaStreams kafkaStreams; + + @BeforeClass + public static void before() throws InterruptedException, IOException { + CLUSTER.start(); + CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1); + CLUSTER.createTopic(INPUT2_TOPIC_NAME, 2, 1); + + final Semaphore semaphore = new Semaphore(0); + + final StreamsBuilder builder = new StreamsBuilder(); + builder + .table( + INPUT_TOPIC_NAME, + Consumed.with(Serdes.Integer(), Serdes.Integer()), + Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(UNCACHED_TABLE) + .withCachingDisabled() + ) + .filter( + (a, b) -> true, + Materialized.as(CACHED_TABLE) + ) + .toStream() + .peek((k, v) -> semaphore.release()); + + builder + .stream( + INPUT2_TOPIC_NAME, + Consumed.with(Serdes.Integer(), Serdes.Integer()) + ) + .groupByKey() + .windowedBy(TimeWindows.ofSizeAndGrace( + Duration.ofMillis(100), + Duration.ZERO + )) + .count( + Materialized + .<Integer, Long, WindowStore<Bytes, byte[]>>as(UNCACHED_COUNTS_TABLE) + .withCachingDisabled() + ) + .toStream() + .peek((k, v) -> semaphore.release()); + + kafkaStreams = + IntegrationTestUtils.getRunningStreams(streamsConfiguration(), builder, true); + + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + + IntegrationTestUtils.produceKeyValuesSynchronously( + INPUT_TOPIC_NAME, + Arrays.asList(new KeyValue<>(1, 1), new KeyValue<>(2, 2), new KeyValue<>(3, 3)), + producerProps, + Time.SYSTEM + ); + // Assert that all messages in the first batch were processed in a timely manner + assertThat(semaphore.tryAcquire(3, 60, TimeUnit.SECONDS), is(equalTo(true))); + + IntegrationTestUtils.produceSynchronously( + producerProps, + false, + INPUT2_TOPIC_NAME, + Optional.empty(), + Arrays.asList( + new KeyValueTimestamp<>(1, 1, 0), + new KeyValueTimestamp<>(1, 1, 10) + ) + ); + + // Assert that we processed the second batch (should see both updates, since caching is disabled) + assertThat(semaphore.tryAcquire(2, 60, TimeUnit.SECONDS), is(equalTo(true))); + + + } + + @AfterClass + public static void after() { + kafkaStreams.close(Duration.of(1, ChronoUnit.MINUTES)); + kafkaStreams.cleanUp(); + CLUSTER.stop(); + } + + @Test + public void shouldQueryKeyFromCachedTable() { + + final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = + kafkaStreams.serdesForStore(CACHED_TABLE); + + final byte[] rawKey = serdes.rawKey(1); + final InteractiveQueryResult<byte[]> result = kafkaStreams.query( + inStore(CACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey))); + + System.out.println("|||" + result); + final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0); + final ValueAndTimestamp<Integer> value = + serdes.valueFrom(rawValueResult.getResult()); + System.out.println("|||" + value); + + assertThat(value.value(), is(1)); + assertThat(result.getPosition(), + is(Position.fromMap( + mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L))))))); + } + + @Test + public void shouldQueryKeyFromUncachedTable() { + + final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = + kafkaStreams.serdesForStore(UNCACHED_TABLE); + + final byte[] rawKey = serdes.rawKey(1); + final InteractiveQueryResult<byte[]> result = kafkaStreams.query( + inStore(UNCACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey))); + + System.out.println("|||" + result); + final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0); + final ValueAndTimestamp<Integer> value = + serdes.valueFrom(rawValueResult.getResult()); + System.out.println("|||" + value); + + assertThat(value.value(), is(1)); + assertThat(result.getPosition(), + is(Position.fromMap( + mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L))))))); + } + + @Test + public void shouldQueryTypedKeyFromUncachedTable() { + final Integer key = 1; + + final InteractiveQueryRequest<ValueAndTimestamp<Integer>> query = + inStore(UNCACHED_TABLE).withQuery(KeyQuery.withKey(key)); + + final InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query); + + final ValueAndTimestamp<Integer> value = result.getOnlyPartitionResult().getResult(); + + assertThat(value.value(), is(1)); + assertThat(result.getPosition(), + is(Position.fromMap( + mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L))))))); + } + + @Test + public void exampleKeyQueryIntoWindowStore() { + final Windowed<Integer> key = new Windowed<>(1, new TimeWindow(0L, 99L)); + + final InteractiveQueryRequest<ValueAndTimestamp<Long>> query = + inStore(UNCACHED_COUNTS_TABLE).withQuery(KeyQuery.withKey(key)); + + final InteractiveQueryResult<ValueAndTimestamp<Long>> result = kafkaStreams.query(query); + + final ValueAndTimestamp<Long> value = result.getOnlyPartitionResult().getResult(); + + assertThat(value.value(), is(2L)); + } + + @Test + public void shouldScanUncachedTablePartitions() { + + final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = + kafkaStreams.serdesForStore(UNCACHED_TABLE); + + final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult = + kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan())); + + System.out.println("|||" + scanResult); + final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = + scanResult.getPartitionResults(); + for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) { + try (final KeyValueIterator<Bytes, byte[]> keyValueIterator = + entry.getValue().getResult()) { + while (keyValueIterator.hasNext()) { + final KeyValue<Bytes, byte[]> next = keyValueIterator.next(); + System.out.println( + "|||" + entry.getKey() + + " " + serdes.keyFrom(next.key.get()) + + " " + serdes.valueFrom(next.value) + ); + } + } + } + + assertThat(scanResult.getPosition(), + is(Position.fromMap( + mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L))))))); + } + + @Test + public void shouldScanUncachedTableCollated() { + + final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = + kafkaStreams.serdesForStore(UNCACHED_TABLE); + + final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult = + kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan())); + + System.out.println("|||" + scanResult); + final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = scanResult.getPartitionResults(); + + final List<KeyValueIterator<Bytes, byte[]>> collect = + partitionResults + .values() + .stream() + .map(QueryResult::getResult) + .collect(Collectors.toList()); + try (final CloseableIterator<KeyValue<Bytes, byte[]>> collate = Iterators.collate( + collect)) { + while (collate.hasNext()) { + final KeyValue<Bytes, byte[]> next = collate.next(); + System.out.println( + "|||" + + " " + serdes.keyFrom(next.key.get()) + + " " + serdes.valueFrom(next.value) + ); + } + } + + assertThat(scanResult.getPosition(), + is(Position.fromMap( + mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L))))))); + } + + @Test + public void shouldQueryWithinBound() { + + final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = + kafkaStreams.serdesForStore(UNCACHED_TABLE); + + final byte[] rawKey = serdes.rawKey(1); + final InteractiveQueryResult<byte[]> result = kafkaStreams.query( + inStore(UNCACHED_TABLE) + .withQuery(RawKeyQuery.withKey(rawKey)) + .withPositionBound( + at( + Position + .emptyPosition() + .withComponent(INPUT_TOPIC_NAME, 0, 0L) + .withComponent(INPUT_TOPIC_NAME, 1, 1L) + ) + ) + ); + + System.out.println("|||" + result); + final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0); + final ValueAndTimestamp<Integer> value = + serdes.valueFrom(rawValueResult.getResult()); + System.out.println("|||" + value); + assertThat(result.getPosition(), + is(Position.fromMap( + mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L), mkEntry(1, 1L))))))); + } + + @Test + public void shouldFailQueryOutsideOfBound() { + + final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = + kafkaStreams.serdesForStore(UNCACHED_TABLE); + + final byte[] rawKey = serdes.rawKey(1); + // intentionally setting the bound higher than the current position. + final InteractiveQueryResult<byte[]> result = kafkaStreams.query( + inStore(UNCACHED_TABLE) + .withQuery(RawKeyQuery.withKey(rawKey)) + .withPositionBound( + at( + Position + .emptyPosition() + .withComponent(INPUT_TOPIC_NAME, 0, 1L) + .withComponent(INPUT_TOPIC_NAME, 1, 2L) + ) + ) + ); + + System.out.println("|||" + result); + final QueryResult<byte[]> rawValueResult = result.getPartitionResults().get(0); + + final RuntimeException runtimeException = assertThrows( + RuntimeException.class, + rawValueResult::getResult + ); + assertThat( + runtimeException.getMessage(), + is("NOT_UP_TO_BOUND: For store partition 0, the current position Position{position={input-topic={0=0}}} is not yet up to the bound PositionBound{position=Position{position={input-topic={0=1, 1=2}}}}")); + + assertThat(rawValueResult.isFailure(), is(true)); + assertThat(rawValueResult.getFailureReason(), is(FailureReason.NOT_UP_TO_BOUND)); + assertThat(rawValueResult.getFailure(), + is("For store partition 0, the current position Position{position={input-topic={0=0}}} is not yet up to the bound PositionBound{position=Position{position={input-topic={0=1, 1=2}}}}")); + assertThat(result.getPosition(), is(Position.emptyPosition())); + } + + + @Test + public void shouldPartiallySucceedOnPartialBound() { + + final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = + kafkaStreams.serdesForStore(UNCACHED_TABLE); + + final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult = + kafkaStreams.query( + inStore(UNCACHED_TABLE) + .withQuery(RawScanQuery.scan()) + .withPositionBound( + at( + Position + .emptyPosition() + .withComponent(INPUT_TOPIC_NAME, 0, 0L) + .withComponent(INPUT_TOPIC_NAME, 1, 2L) + ) + ) + ); + + System.out.println("|||" + scanResult); + final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = scanResult.getPartitionResults(); + for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) { + final QueryResult<KeyValueIterator<Bytes, byte[]>> value = entry.getValue(); + if (value.isSuccess()) { + try (final KeyValueIterator<Bytes, byte[]> keyValueIterator = + value.getResult()) { + while (keyValueIterator.hasNext()) { + final KeyValue<Bytes, byte[]> next = keyValueIterator.next(); + System.out.println( + "|||" + entry.getKey() + + " " + serdes.keyFrom(next.key.get()) + + " " + serdes.valueFrom(next.value) + ); + } + } + } + } + + assertThat(scanResult.getPartitionResults().get(0).isSuccess(), is(true)); + assertThat(scanResult.getPartitionResults().get(1).isFailure(), is(true)); + assertThat(scanResult.getPartitionResults().get(1).getFailureReason(), + is(FailureReason.NOT_UP_TO_BOUND)); + assertThat(scanResult.getPosition(), + is(Position.fromMap(mkMap(mkEntry("input-topic", mkMap(mkEntry(0, 0L))))))); + } + + private static Properties streamsConfiguration() { + final String safeTestName = IQv2IntegrationTest.class.getName(); + final Properties config = new Properties(); + config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); + config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); + config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + return config; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 066f080..3772904 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -71,6 +71,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.UUID; @@ -786,6 +787,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { EasyMock.expect(context.appConfigs()) .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals()); EasyMock.expect(context.stateDir()).andStubReturn(dir); + EasyMock.expect(context.recordMetadata()).andReturn(Optional.empty()); EasyMock.replay(context); rocksDBStore.init((StateStoreContext) context, rocksDBStore); @@ -818,6 +820,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { EasyMock.expect(context.appConfigs()) .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals()); EasyMock.expect(context.stateDir()).andStubReturn(dir); + EasyMock.expect(context.recordMetadata()).andReturn(Optional.empty()); EasyMock.replay(context); rocksDBStore.init((StateStoreContext) context, rocksDBStore);
