This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch iqv2-framework in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 2188078203c429ac3ff44198f3da4d57d4153ea2 Author: John Roesler <[email protected]> AuthorDate: Tue Nov 23 22:49:27 2021 -0600 finalizing... --- .../org/apache/kafka/streams/KafkaStreams.java | 8 +- .../apache/kafka/streams/processor/StateStore.java | 14 +- .../streams/query/InteractiveQueryRequest.java | 157 -------------- .../streams/query/InteractiveQueryResult.java | 72 ------- .../apache/kafka/streams/query/PositionBound.java | 36 +++- .../kafka/streams/query/StateQueryRequest.java | 226 +++++++++++++++++++++ .../kafka/streams/query/StateQueryResult.java | 118 +++++++++++ .../streams/integration/IQv2IntegrationTest.java | 31 ++- .../kafka/streams/query/PositionBoundTest.java | 100 +++++++++ 9 files changed, 505 insertions(+), 257 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 2356096..4b926cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -63,8 +63,8 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat import org.apache.kafka.streams.processor.internals.TopologyMetadata; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.query.InteractiveQueryRequest; -import org.apache.kafka.streams.query.InteractiveQueryResult; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.HostInfo; @@ -1772,14 +1772,14 @@ public class KafkaStreams implements AutoCloseable { } @Evolving - public <R> InteractiveQueryResult<R> query(final InteractiveQueryRequest<R> request) { + public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) { final String storeName = request.getStoreName(); if (!topologyMetadata.hasStore(storeName)) { throw new UnknownStateStoreException( "Cannot get state store " + storeName + " because no such store is registered in the topology." ); } - final InteractiveQueryResult<R> result = new InteractiveQueryResult<>(new HashMap<>()); + final StateQueryResult<R> result = new StateQueryResult<>(); final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores(); if (globalStateStores.containsKey(storeName)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index b1c480f..2f96020 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; @@ -129,19 +130,26 @@ public interface StateStore { * a failure. * <p> * If the store doesn't know how to handle the given query, the result - * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}. + * shall be a {@link FailureReason#UNKNOWN_QUERY_TYPE}. * If the store couldn't satisfy the given position bound, the result - * will be a {@link FailureReason#NOT_UP_TO_BOUND}. + * shall be a {@link FailureReason#NOT_UP_TO_BOUND}. + * <p> + * Note to store implementers: if your store does not support position tracking, + * you can correctly respond {@link FailureReason#NOT_UP_TO_BOUND} if the argument is + * anything but {@link PositionBound#unbounded()}. Be sure to explain in the failure message + * that bounded positions are not supported. + * <p> * @param query The query to execute * @param positionBound The position the store must be at or past * @param collectExecutionInfo Whether the store should collect detailed execution info for the query * @param <R> The result type */ + @Evolving default <R> QueryResult<R> query( Query<R> query, PositionBound positionBound, boolean collectExecutionInfo) { - + // If a store doesn't implement a query handler, then all queries are unknown. return QueryResult.forUnknownQueryType(query, this); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java deleted file mode 100644 index 56fc0a8..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.query; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; - -/** - * @param <R> - */ -public class InteractiveQueryRequest<R> { - - private final String storeName; - private final PositionBound position; - private final Optional<Set<Integer>> partitions; - private final Query<R> query; - private boolean executionInfoEnabled; - private boolean requireActive; - - private InteractiveQueryRequest( - final String storeName, - final PositionBound position, - final Optional<Set<Integer>> partitions, - final Query<R> query, - final boolean executionInfoEnabled, final boolean requireActive) { - - this.storeName = storeName; - this.position = position; - this.partitions = partitions; - this.query = query; - this.executionInfoEnabled = executionInfoEnabled; - this.requireActive = requireActive; - } - - public static InStore inStore(final String name) { - return new InStore(name); - } - - public InteractiveQueryRequest<R> withPositionBound(final PositionBound positionBound) { - return new InteractiveQueryRequest<>( - storeName, - positionBound, - partitions, - query, - executionInfoEnabled, - false); - } - - - public InteractiveQueryRequest<R> withNoPartitions() { - return new InteractiveQueryRequest<>(storeName, - position, - Optional.of(Collections.emptySet()), - query, - executionInfoEnabled, - requireActive); - } - - public InteractiveQueryRequest<R> withAllPartitions() { - return new InteractiveQueryRequest<>(storeName, - position, - Optional.empty(), - query, - executionInfoEnabled, - requireActive); - } - - public InteractiveQueryRequest<R> withPartitions(final Set<Integer> partitions) { - return new InteractiveQueryRequest<>(storeName, - position, - Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))), - query, - executionInfoEnabled, - requireActive); - } - - public String getStoreName() { - return storeName; - } - - public PositionBound getPositionBound() { - if (requireActive) { - throw new IllegalArgumentException(); - } - return Objects.requireNonNull(position); - } - - public Query<R> getQuery() { - return query; - } - - public boolean isAllPartitions() { - return !partitions.isPresent(); - } - - public Set<Integer> getPartitions() { - if (!partitions.isPresent()) { - throw new UnsupportedOperationException( - "Cannot list partitions of an 'all partitions' request"); - } else { - return partitions.get(); - } - } - - public InteractiveQueryRequest<R> enableExecutionInfo() { - return new InteractiveQueryRequest<>(storeName, - position, - partitions, - query, - true, - requireActive); - } - - public boolean executionInfoEnabled() { - return executionInfoEnabled; - } - - public boolean isRequireActive() { - return requireActive; - } - - public static class InStore { - - private final String name; - - private InStore(final String name) { - this.name = name; - } - - public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query) { - return new InteractiveQueryRequest<>( - name, - null, - Optional.empty(), - query, - false, - true); - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java deleted file mode 100644 index 4f8e728..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.query; - -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class InteractiveQueryResult<R> { - - private final Map<Integer, QueryResult<R>> partitionResults; - - public InteractiveQueryResult(final Map<Integer, QueryResult<R>> resultMap) { - partitionResults = resultMap; - } - - public void setGlobalResult(final QueryResult<R> r) { - - } - - public void addResult(final int partition, final QueryResult<R> r) { - partitionResults.put(partition, r); - } - - public Map<Integer, QueryResult<R>> getPartitionResults() { - return partitionResults; - } - - public QueryResult<R> getOnlyPartitionResult() { - final List<QueryResult<R>> nonempty = - partitionResults - .values() - .stream() - .filter(r -> r.getResult() != null) - .collect(Collectors.toList()); - - if (nonempty.size() != 1) { - throw new IllegalStateException(); - } else { - return nonempty.get(0); - } - } - - public Position getPosition() { - Position position = Position.emptyPosition(); - for (final QueryResult<R> r : partitionResults.values()) { - position = position.merge(r.getPosition()); - } - return position; - } - - @Override - public String toString() { - return "InteractiveQueryResult{" + - "partitionResults=" + partitionResults + - '}'; - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java index a4dbe35..185a88e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java @@ -17,8 +17,17 @@ package org.apache.kafka.streams.query; +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + import java.util.Objects; +/** + * A class bounding the processing state Position during queries. + * This can be used to specify that a query should fail if the + * locally available partition isn't caught up to the specified bound. + * "Unbounded" places no restrictions on the current location of the partition. + */ +@Evolving public class PositionBound { private final Position position; @@ -27,26 +36,45 @@ public class PositionBound { private PositionBound(final Position position, final boolean unbounded) { if (unbounded && position != null) { throw new IllegalArgumentException(); + } else if (position != null) { + this.position = position.copy(); + this.unbounded = false; + } else { + this.position = null; + this.unbounded = unbounded; } - this.position = position; - this.unbounded = unbounded; } + /** + * Creates a new PositionBound representing "no bound" + */ public static PositionBound unbounded() { return new PositionBound(null, true); } + /** + * Creates a new PositionBound representing a specific position. + */ public static PositionBound at(final Position position) { return new PositionBound(position, false); } + /** + * Returns true iff this object specifies that there is no position bound. + */ public boolean isUnbounded() { return unbounded; } + /** + * Returns the specific position of this bound. + * @throws IllegalArgumentException if this is an "unbounded" position. + */ public Position position() { if (unbounded) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException( + "Cannot get the position of an unbounded PositionBound." + ); } else { return position; } @@ -75,6 +103,6 @@ public class PositionBound { @Override public int hashCode() { - return Objects.hash(position, unbounded); + throw new UnsupportedOperationException("This mutable object is not suitable as a hash key"); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java new file mode 100644 index 0000000..1f3dbf0 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * The request object for Interactive Queries. This is an immutable builder class for passing all + * required and optional arguments for querying a state store in Kafka Streams. + * <p> + * + * @param <R> The type of the query result. + */ +@Evolving +public class StateQueryRequest<R> { + + private final String storeName; + private final PositionBound position; + private final Optional<Set<Integer>> partitions; + private final Query<R> query; + private final boolean executionInfoEnabled; + private final boolean requireActive; + + private StateQueryRequest( + final String storeName, + final PositionBound position, + final Optional<Set<Integer>> partitions, + final Query<R> query, + final boolean executionInfoEnabled, + final boolean requireActive) { + + this.storeName = storeName; + this.position = position; + this.partitions = partitions; + this.query = query; + this.executionInfoEnabled = executionInfoEnabled; + this.requireActive = requireActive; + } + + /** + * Specifies the name of the store to query. + */ + public static InStore inStore(final String name) { + return new InStore(name); + } + + /** + * Bounds the position of the state store against its input topics. + */ + public StateQueryRequest<R> withPositionBound(final PositionBound positionBound) { + return new StateQueryRequest<>( + storeName, + positionBound, + partitions, + query, + executionInfoEnabled, + requireActive + ); + } + + + /** + * Specifies that the query will run against all locally available partitions. + */ + public StateQueryRequest<R> withAllPartitions() { + return new StateQueryRequest<>( + storeName, + position, + Optional.empty(), + query, + executionInfoEnabled, + requireActive + ); + } + + /** + * Specifies a set of partitions to run against. If some partitions are not locally available, + * the response will contain a {@link FailureReason#NOT_PRESENT} for those partitions. If + * some partitions in this set are not valid partitions for the store, the response will + * contain a {@link FailureReason#DOES_NOT_EXIST} for those partitions. + */ + public StateQueryRequest<R> withPartitions(final Set<Integer> partitions) { + return new StateQueryRequest<>( + storeName, + position, + Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))), + query, + executionInfoEnabled, + requireActive + ); + } + + /** + * Requests for stores and the Streams runtime to record any useful details about + * how the query was executed. + */ + public StateQueryRequest<R> enableExecutionInfo() { + return new StateQueryRequest<>( + storeName, + position, + partitions, + query, + true, + requireActive + ); + } + + /** + * Specifies that this query should only run on partitions for which this instance is + * the leader (aka "active"). Partitions for which this instance is not the active replica + * will return {@link FailureReason#NOT_UP_TO_BOUND}. + */ + public StateQueryRequest<R> requireActive() { + return new StateQueryRequest<>( + storeName, + position, + partitions, + query, + executionInfoEnabled, + true + ); + } + + /** + * The name of the store this request is for. + */ + public String getStoreName() { + return storeName; + } + + /** + * The bound that this request places on its query, in terms of the partitions' positions + * against its inputs. + */ + public PositionBound getPositionBound() { + return position; + } + + /** + * The query this request is meant to run. + */ + public Query<R> getQuery() { + return query; + } + + /** + * Whether this request should fetch from all locally available partitions. + */ + public boolean isAllPartitions() { + return !partitions.isPresent(); + } + + /** + * If the request is for specific partitions, return the set of partitions to query. + * + * @throws IllegalStateException if this is a request for all partitions + */ + public Set<Integer> getPartitions() { + if (!partitions.isPresent()) { + throw new IllegalStateException( + "Cannot list partitions of an 'all partitions' request"); + } else { + return partitions.get(); + } + } + + /** + * Whether the request includes detailed execution information. + */ + public boolean executionInfoEnabled() { + return executionInfoEnabled; + } + + /** + * Whether this request requires the query to execute only on active partitions. + */ + public boolean isRequireActive() { + return requireActive; + } + + /** + * A progressive builder interface for creating {@code StoreQueryRequest}s. + */ + public static class InStore { + + private final String name; + + private InStore(final String name) { + this.name = name; + } + + /** + * Specifies the query to run on the specified store. + */ + public <R> StateQueryRequest<R> withQuery(final Query<R> query) { + return new StateQueryRequest<>( + name, // name is already specified + PositionBound.unbounded(), // default: unbounded + Optional.empty(), // default: all partitions + query, // the query is specified + false, // default: no execution info + false // default: don't require active + ); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java new file mode 100644 index 0000000..06cc6b4 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * The response object for interactive queries. + * This wraps the individual partition results, as well as + * metadata relating to the result as a whole. + * <p> + * @param <R> The type of the query result. + */ +@Evolving +public class StateQueryResult<R> { + private final Map<Integer, QueryResult<R>> partitionResults = new HashMap<>(); + private QueryResult<R> globalResult = null; + + /** + * Set the result for a global store query. + * Used by Kafka Streams and available for tests. + */ + public void setGlobalResult(final QueryResult<R> r) { + this.globalResult = r; + } + + /** + * Set the result for a partitioned store query. + * Used by Kafka Streams and available for tests. + */ + public void addResult(final int partition, final QueryResult<R> r) { + partitionResults.put(partition, r); + } + + + /** + * The query's result for each partition that executed the query. + * Empty for global store queries. + */ + public Map<Integer, QueryResult<R>> getPartitionResults() { + return partitionResults; + } + + /** + * For queries that are expected to match records in only one + * partition, returns the result. + * + * @throws IllegalArgumentException if the results are not for exactly one partition. + */ + public QueryResult<R> getOnlyPartitionResult() { + final List<QueryResult<R>> nonempty = + partitionResults + .values() + .stream() + .filter(r -> r.getResult() != null) + .collect(Collectors.toList()); + + if (nonempty.size() != 1) { + throw new IllegalArgumentException( + "The query did not return exactly one partition result: " + partitionResults + ); + } else { + return nonempty.get(0); + } + } + + /** + * The query's result for global store queries. Is {@code null} for + * non-global (partitioned) store queries. + */ + public QueryResult<R> getGlobalResult() { + return globalResult; + } + + /** + * The position of the state store at the moment it executed the + * query. In conjunction + * with {@link StateQueryRequest#withPositionBound}, this can be + * used to achieve a good balance between consistency and + * availability in which repeated queries are guaranteed to + * advance in time while allowing reads to be served from any + * replica that is caught up to that caller's prior observations. + */ + public Position getPosition() { + Position position = Position.emptyPosition(); + for (final QueryResult<R> r : partitionResults.values()) { + position = position.merge(r.getPosition()); + } + return position; + } + + @Override + public String toString() { + return "StateQueryResult{" + + "partitionResults=" + partitionResults + + ", globalResult=" + globalResult + + '}'; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index 23374f3..40f0062 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; -import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -37,8 +36,8 @@ import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.query.FailureReason; -import org.apache.kafka.streams.query.InteractiveQueryRequest; -import org.apache.kafka.streams.query.InteractiveQueryResult; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; import org.apache.kafka.streams.query.Iterators; import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; @@ -47,8 +46,6 @@ import org.apache.kafka.streams.query.RawKeyQuery; import org.apache.kafka.streams.query.RawScanQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; @@ -76,7 +73,7 @@ import java.util.stream.Collectors; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.query.InteractiveQueryRequest.inStore; +import static org.apache.kafka.streams.query.StateQueryRequest.inStore; import static org.apache.kafka.streams.query.PositionBound.at; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -189,7 +186,7 @@ public class IQv2IntegrationTest { kafkaStreams.serdesForStore(CACHED_TABLE); final byte[] rawKey = serdes.rawKey(1); - final InteractiveQueryResult<byte[]> result = kafkaStreams.query( + final StateQueryResult<byte[]> result = kafkaStreams.query( inStore(CACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey))); System.out.println("|||" + result); @@ -211,7 +208,7 @@ public class IQv2IntegrationTest { kafkaStreams.serdesForStore(UNCACHED_TABLE); final byte[] rawKey = serdes.rawKey(1); - final InteractiveQueryResult<byte[]> result = kafkaStreams.query( + final StateQueryResult<byte[]> result = kafkaStreams.query( inStore(UNCACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey))); System.out.println("|||" + result); @@ -230,10 +227,10 @@ public class IQv2IntegrationTest { public void shouldQueryTypedKeyFromUncachedTable() { final Integer key = 1; - final InteractiveQueryRequest<ValueAndTimestamp<Integer>> query = + final StateQueryRequest<ValueAndTimestamp<Integer>> query = inStore(UNCACHED_TABLE).withQuery(KeyQuery.withKey(key)); - final InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query); + final StateQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query); final ValueAndTimestamp<Integer> value = result.getOnlyPartitionResult().getResult(); @@ -247,10 +244,10 @@ public class IQv2IntegrationTest { public void exampleKeyQueryIntoWindowStore() { final Windowed<Integer> key = new Windowed<>(1, new TimeWindow(0L, 99L)); - final InteractiveQueryRequest<ValueAndTimestamp<Long>> query = + final StateQueryRequest<ValueAndTimestamp<Long>> query = inStore(UNCACHED_COUNTS_TABLE).withQuery(KeyQuery.withKey(key)); - final InteractiveQueryResult<ValueAndTimestamp<Long>> result = kafkaStreams.query(query); + final StateQueryResult<ValueAndTimestamp<Long>> result = kafkaStreams.query(query); final ValueAndTimestamp<Long> value = result.getOnlyPartitionResult().getResult(); @@ -263,7 +260,7 @@ public class IQv2IntegrationTest { final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore(UNCACHED_TABLE); - final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult = + final StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult = kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan())); System.out.println("|||" + scanResult); @@ -294,7 +291,7 @@ public class IQv2IntegrationTest { final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore(UNCACHED_TABLE); - final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult = + final StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult = kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan())); System.out.println("|||" + scanResult); @@ -330,7 +327,7 @@ public class IQv2IntegrationTest { kafkaStreams.serdesForStore(UNCACHED_TABLE); final byte[] rawKey = serdes.rawKey(1); - final InteractiveQueryResult<byte[]> result = kafkaStreams.query( + final StateQueryResult<byte[]> result = kafkaStreams.query( inStore(UNCACHED_TABLE) .withQuery(RawKeyQuery.withKey(rawKey)) .withPositionBound( @@ -361,7 +358,7 @@ public class IQv2IntegrationTest { final byte[] rawKey = serdes.rawKey(1); // intentionally setting the bound higher than the current position. - final InteractiveQueryResult<byte[]> result = kafkaStreams.query( + final StateQueryResult<byte[]> result = kafkaStreams.query( inStore(UNCACHED_TABLE) .withQuery(RawKeyQuery.withKey(rawKey)) .withPositionBound( @@ -399,7 +396,7 @@ public class IQv2IntegrationTest { final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore(UNCACHED_TABLE); - final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult = + final StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult = kafkaStreams.query( inStore(UNCACHED_TABLE) .withQuery(RawScanQuery.scan()) diff --git a/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java new file mode 100644 index 0000000..41175c2 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.query; + +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class PositionBoundTest { + + @Test + public void shouldCopyPosition() { + final Position position = Position.emptyPosition(); + final PositionBound positionBound = PositionBound.at(position); + position.withComponent("topic", 1, 2L); + + assertThat(position.getTopics(), equalTo(mkSet("topic"))); + assertThat(positionBound.position().getTopics(), empty()); + } + + @Test + public void unboundedShouldBeUnbounded() { + final PositionBound bound = PositionBound.unbounded(); + assertTrue(bound.isUnbounded()); + } + + @Test + public void unboundedShouldThrowOnPosition() { + final PositionBound bound = PositionBound.unbounded(); + assertThrows(IllegalArgumentException.class, bound::position); + } + + @Test + public void shouldEqualPosition() { + final PositionBound bound1 = PositionBound.at(Position.emptyPosition()); + final PositionBound bound2 = PositionBound.at(Position.emptyPosition()); + assertEquals(bound1, bound2); + } + + @Test + public void shouldEqualUnbounded() { + final PositionBound bound1 = PositionBound.unbounded(); + final PositionBound bound2 = PositionBound.unbounded(); + assertEquals(bound1, bound2); + } + + @Test + public void shouldEqualSelf() { + final PositionBound bound1 = PositionBound.at(Position.emptyPosition()); + assertEquals(bound1, bound1); + } + + @Test + public void shouldNotEqualNull() { + final PositionBound bound1 = PositionBound.at(Position.emptyPosition()); + assertNotEquals(bound1, null); + } + + @Test + public void shouldNotHash() { + final PositionBound bound = PositionBound.at(Position.emptyPosition()); + assertThrows(UnsupportedOperationException.class, bound::hashCode); + + // going overboard... + final HashSet<PositionBound> set = new HashSet<>(); + assertThrows(UnsupportedOperationException.class, () -> set.add(bound)); + + final HashMap<PositionBound, Integer> map = new HashMap<>(); + assertThrows(UnsupportedOperationException.class, () -> map.put(bound, 5)); + } +}
