This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2188078203c429ac3ff44198f3da4d57d4153ea2
Author: John Roesler <[email protected]>
AuthorDate: Tue Nov 23 22:49:27 2021 -0600

    finalizing...
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   8 +-
 .../apache/kafka/streams/processor/StateStore.java |  14 +-
 .../streams/query/InteractiveQueryRequest.java     | 157 --------------
 .../streams/query/InteractiveQueryResult.java      |  72 -------
 .../apache/kafka/streams/query/PositionBound.java  |  36 +++-
 .../kafka/streams/query/StateQueryRequest.java     | 226 +++++++++++++++++++++
 .../kafka/streams/query/StateQueryResult.java      | 118 +++++++++++
 .../streams/integration/IQv2IntegrationTest.java   |  31 ++-
 .../kafka/streams/query/PositionBoundTest.java     | 100 +++++++++
 9 files changed, 505 insertions(+), 257 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2356096..4b926cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -63,8 +63,8 @@ import 
org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
 import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.query.InteractiveQueryRequest;
-import org.apache.kafka.streams.query.InteractiveQueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.HostInfo;
@@ -1772,14 +1772,14 @@ public class KafkaStreams implements AutoCloseable {
     }
 
     @Evolving
-    public <R> InteractiveQueryResult<R> query(final 
InteractiveQueryRequest<R> request) {
+    public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
         final String storeName = request.getStoreName();
         if (!topologyMetadata.hasStore(storeName)) {
             throw new UnknownStateStoreException(
                 "Cannot get state store " + storeName + " because no such 
store is registered in the topology."
             );
         }
-        final InteractiveQueryResult<R> result = new 
InteractiveQueryResult<>(new HashMap<>());
+        final StateQueryResult<R> result = new StateQueryResult<>();
 
         final Map<String, StateStore> globalStateStores = 
topologyMetadata.globalStateStores();
         if (globalStateStores.containsKey(storeName)) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index b1c480f..2f96020 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
@@ -129,19 +130,26 @@ public interface StateStore {
      * a failure.
      * <p>
      * If the store doesn't know how to handle the given query, the result
-     * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
+     * shall be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
      * If the store couldn't satisfy the given position bound, the result
-     * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
+     * shall be a {@link FailureReason#NOT_UP_TO_BOUND}.
+     * <p>
+     * Note to store implementers: if your store does not support position 
tracking,
+     * you can correctly respond {@link FailureReason#NOT_UP_TO_BOUND} if the 
argument is
+     * anything but {@link PositionBound#unbounded()}. Be sure to explain in 
the failure message
+     * that bounded positions are not supported.
+     * <p>
      * @param query The query to execute
      * @param positionBound The position the store must be at or past
      * @param collectExecutionInfo Whether the store should collect detailed 
execution info for the query
      * @param <R> The result type
      */
+    @Evolving
     default <R> QueryResult<R> query(
         Query<R> query,
         PositionBound positionBound,
         boolean collectExecutionInfo) {
-
+        // If a store doesn't implement a query handler, then all queries are 
unknown.
         return QueryResult.forUnknownQueryType(query, this);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
 
b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
deleted file mode 100644
index 56fc0a8..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryRequest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.query;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * @param <R>
- */
-public class InteractiveQueryRequest<R> {
-
-    private final String storeName;
-    private final PositionBound position;
-    private final Optional<Set<Integer>> partitions;
-    private final Query<R> query;
-    private boolean executionInfoEnabled;
-    private boolean requireActive;
-
-    private InteractiveQueryRequest(
-        final String storeName,
-        final PositionBound position,
-        final Optional<Set<Integer>> partitions,
-        final Query<R> query,
-        final boolean executionInfoEnabled, final boolean requireActive) {
-
-        this.storeName = storeName;
-        this.position = position;
-        this.partitions = partitions;
-        this.query = query;
-        this.executionInfoEnabled = executionInfoEnabled;
-        this.requireActive = requireActive;
-    }
-
-    public static InStore inStore(final String name) {
-        return new InStore(name);
-    }
-
-    public InteractiveQueryRequest<R> withPositionBound(final PositionBound 
positionBound) {
-        return new InteractiveQueryRequest<>(
-            storeName,
-            positionBound,
-            partitions,
-            query,
-            executionInfoEnabled,
-            false);
-    }
-
-
-    public InteractiveQueryRequest<R> withNoPartitions() {
-        return new InteractiveQueryRequest<>(storeName,
-            position,
-            Optional.of(Collections.emptySet()),
-            query,
-            executionInfoEnabled,
-            requireActive);
-    }
-
-    public InteractiveQueryRequest<R> withAllPartitions() {
-        return new InteractiveQueryRequest<>(storeName,
-            position,
-            Optional.empty(),
-            query,
-            executionInfoEnabled,
-            requireActive);
-    }
-
-    public InteractiveQueryRequest<R> withPartitions(final Set<Integer> 
partitions) {
-        return new InteractiveQueryRequest<>(storeName,
-            position,
-            Optional.of(Collections.unmodifiableSet(new 
HashSet<>(partitions))),
-            query,
-            executionInfoEnabled,
-            requireActive);
-    }
-
-    public String getStoreName() {
-        return storeName;
-    }
-
-    public PositionBound getPositionBound() {
-        if (requireActive) {
-            throw new IllegalArgumentException();
-        }
-        return Objects.requireNonNull(position);
-    }
-
-    public Query<R> getQuery() {
-        return query;
-    }
-
-    public boolean isAllPartitions() {
-        return !partitions.isPresent();
-    }
-
-    public Set<Integer> getPartitions() {
-        if (!partitions.isPresent()) {
-            throw new UnsupportedOperationException(
-                "Cannot list partitions of an 'all partitions' request");
-        } else {
-            return partitions.get();
-        }
-    }
-
-    public InteractiveQueryRequest<R> enableExecutionInfo() {
-        return new InteractiveQueryRequest<>(storeName,
-            position,
-            partitions,
-            query,
-            true,
-            requireActive);
-    }
-
-    public boolean executionInfoEnabled() {
-        return executionInfoEnabled;
-    }
-
-    public boolean isRequireActive() {
-        return requireActive;
-    }
-
-    public static class InStore {
-
-        private final String name;
-
-        private InStore(final String name) {
-            this.name = name;
-        }
-
-        public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query) {
-            return new InteractiveQueryRequest<>(
-                name,
-                null,
-                Optional.empty(),
-                query,
-                false,
-                true);
-        }
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
 
b/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
deleted file mode 100644
index 4f8e728..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/query/InteractiveQueryResult.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.query;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class InteractiveQueryResult<R> {
-
-    private final Map<Integer, QueryResult<R>> partitionResults;
-
-    public InteractiveQueryResult(final Map<Integer, QueryResult<R>> 
resultMap) {
-        partitionResults = resultMap;
-    }
-
-    public void setGlobalResult(final QueryResult<R> r) {
-
-    }
-
-    public void addResult(final int partition, final QueryResult<R> r) {
-        partitionResults.put(partition, r);
-    }
-
-    public Map<Integer, QueryResult<R>> getPartitionResults() {
-        return partitionResults;
-    }
-
-    public QueryResult<R> getOnlyPartitionResult() {
-        final List<QueryResult<R>> nonempty =
-            partitionResults
-                .values()
-                .stream()
-                .filter(r -> r.getResult() != null)
-                .collect(Collectors.toList());
-
-        if (nonempty.size() != 1) {
-            throw new IllegalStateException();
-        } else {
-            return nonempty.get(0);
-        }
-    }
-
-    public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
-        }
-        return position;
-    }
-
-    @Override
-    public String toString() {
-        return "InteractiveQueryResult{" +
-            "partitionResults=" + partitionResults +
-            '}';
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java 
b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
index a4dbe35..185a88e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
@@ -17,8 +17,17 @@
 package org.apache.kafka.streams.query;
 
 
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
 import java.util.Objects;
 
+/**
+ * A class bounding the processing state Position during queries.
+ * This can be used to specify that a query should fail if the
+ * locally available partition isn't caught up to the specified bound.
+ * "Unbounded" places no restrictions on the current location of the partition.
+ */
+@Evolving
 public class PositionBound {
 
     private final Position position;
@@ -27,26 +36,45 @@ public class PositionBound {
     private PositionBound(final Position position, final boolean unbounded) {
         if (unbounded && position != null) {
             throw new IllegalArgumentException();
+        } else if (position != null) {
+            this.position = position.copy();
+            this.unbounded = false;
+        } else {
+            this.position = null;
+            this.unbounded = unbounded;
         }
-        this.position = position;
-        this.unbounded = unbounded;
     }
 
+    /**
+     * Creates a new PositionBound representing "no bound"
+     */
     public static PositionBound unbounded() {
         return new PositionBound(null, true);
     }
 
+    /**
+     * Creates a new PositionBound representing a specific position.
+     */
     public static PositionBound at(final Position position) {
         return new PositionBound(position, false);
     }
 
+    /**
+     * Returns true iff this object specifies that there is no position bound.
+     */
     public boolean isUnbounded() {
         return unbounded;
     }
 
+    /**
+     * Returns the specific position of this bound.
+     * @throws IllegalArgumentException if this is an "unbounded" position.
+     */
     public Position position() {
         if (unbounded) {
-            throw new IllegalArgumentException();
+            throw new IllegalArgumentException(
+                "Cannot get the position of an unbounded PositionBound."
+            );
         } else {
             return position;
         }
@@ -75,6 +103,6 @@ public class PositionBound {
 
     @Override
     public int hashCode() {
-        return Objects.hash(position, unbounded);
+        throw new UnsupportedOperationException("This mutable object is not 
suitable as a hash key");
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java 
b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java
new file mode 100644
index 0000000..1f3dbf0
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The request object for Interactive Queries. This is an immutable builder 
class for passing all
+ * required and optional arguments for querying a state store in Kafka Streams.
+ * <p>
+ *
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryRequest<R> {
+
+    private final String storeName;
+    private final PositionBound position;
+    private final Optional<Set<Integer>> partitions;
+    private final Query<R> query;
+    private final boolean executionInfoEnabled;
+    private final boolean requireActive;
+
+    private StateQueryRequest(
+        final String storeName,
+        final PositionBound position,
+        final Optional<Set<Integer>> partitions,
+        final Query<R> query,
+        final boolean executionInfoEnabled,
+        final boolean requireActive) {
+
+        this.storeName = storeName;
+        this.position = position;
+        this.partitions = partitions;
+        this.query = query;
+        this.executionInfoEnabled = executionInfoEnabled;
+        this.requireActive = requireActive;
+    }
+
+    /**
+     * Specifies the name of the store to query.
+     */
+    public static InStore inStore(final String name) {
+        return new InStore(name);
+    }
+
+    /**
+     * Bounds the position of the state store against its input topics.
+     */
+    public StateQueryRequest<R> withPositionBound(final PositionBound 
positionBound) {
+        return new StateQueryRequest<>(
+            storeName,
+            positionBound,
+            partitions,
+            query,
+            executionInfoEnabled,
+            requireActive
+        );
+    }
+
+
+    /**
+     * Specifies that the query will run against all locally available 
partitions.
+     */
+    public StateQueryRequest<R> withAllPartitions() {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            Optional.empty(),
+            query,
+            executionInfoEnabled,
+            requireActive
+        );
+    }
+
+    /**
+     * Specifies a set of partitions to run against. If some partitions are 
not locally available,
+     * the response will contain a {@link FailureReason#NOT_PRESENT} for those 
partitions. If
+     * some partitions in this set are not valid partitions for the store, the 
response will
+     * contain a {@link FailureReason#DOES_NOT_EXIST} for those partitions.
+     */
+    public StateQueryRequest<R> withPartitions(final Set<Integer> partitions) {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            Optional.of(Collections.unmodifiableSet(new 
HashSet<>(partitions))),
+            query,
+            executionInfoEnabled,
+            requireActive
+        );
+    }
+
+    /**
+     * Requests for stores and the Streams runtime to record any useful 
details about
+     * how the query was executed.
+     */
+    public StateQueryRequest<R> enableExecutionInfo() {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            partitions,
+            query,
+            true,
+            requireActive
+        );
+    }
+
+    /**
+     * Specifies that this query should only run on partitions for which this 
instance is
+     * the leader (aka "active"). Partitions for which this instance is not 
the active replica
+     * will return {@link FailureReason#NOT_UP_TO_BOUND}.
+     */
+    public StateQueryRequest<R> requireActive() {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            partitions,
+            query,
+            executionInfoEnabled,
+            true
+        );
+    }
+
+    /**
+     * The name of the store this request is for.
+     */
+    public String getStoreName() {
+        return storeName;
+    }
+
+    /**
+     * The bound that this request places on its query, in terms of the 
partitions' positions
+     * against its inputs.
+     */
+    public PositionBound getPositionBound() {
+        return position;
+    }
+
+    /**
+     * The query this request is meant to run.
+     */
+    public Query<R> getQuery() {
+        return query;
+    }
+
+    /**
+     * Whether this request should fetch from all locally available partitions.
+     */
+    public boolean isAllPartitions() {
+        return !partitions.isPresent();
+    }
+
+    /**
+     * If the request is for specific partitions, return the set of partitions 
to query.
+     *
+     * @throws IllegalStateException if this is a request for all partitions
+     */
+    public Set<Integer> getPartitions() {
+        if (!partitions.isPresent()) {
+            throw new IllegalStateException(
+                "Cannot list partitions of an 'all partitions' request");
+        } else {
+            return partitions.get();
+        }
+    }
+
+    /**
+     * Whether the request includes detailed execution information.
+     */
+    public boolean executionInfoEnabled() {
+        return executionInfoEnabled;
+    }
+
+    /**
+     * Whether this request requires the query to execute only on active 
partitions.
+     */
+    public boolean isRequireActive() {
+        return requireActive;
+    }
+
+    /**
+     * A progressive builder interface for creating {@code StoreQueryRequest}s.
+     */
+    public static class InStore {
+
+        private final String name;
+
+        private InStore(final String name) {
+            this.name = name;
+        }
+
+        /**
+         * Specifies the query to run on the specified store.
+         */
+        public <R> StateQueryRequest<R> withQuery(final Query<R> query) {
+            return new StateQueryRequest<>(
+                name, // name is already specified
+                PositionBound.unbounded(), // default: unbounded
+                Optional.empty(), // default: all partitions
+                query, // the query is specified
+                false, // default: no execution info
+                false // default: don't require active
+            );
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java 
b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
new file mode 100644
index 0000000..06cc6b4
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The response object for interactive queries.
+ * This wraps the individual partition results, as well as
+ * metadata relating to the result as a whole.
+ * <p>
+ * @param <R> The type of the query result.
+ */
+@Evolving
+public class StateQueryResult<R> {
+    private final Map<Integer, QueryResult<R>> partitionResults = new 
HashMap<>();
+    private QueryResult<R> globalResult = null;
+
+    /**
+     * Set the result for a global store query.
+     * Used by Kafka Streams and available for tests.
+     */
+    public void setGlobalResult(final QueryResult<R> r) {
+        this.globalResult = r;
+    }
+
+    /**
+     * Set the result for a partitioned store query.
+     * Used by Kafka Streams and available for tests.
+     */
+    public void addResult(final int partition, final QueryResult<R> r) {
+        partitionResults.put(partition, r);
+    }
+
+
+    /**
+     * The query's result for each partition that executed the query.
+     * Empty for global store queries.
+     */
+    public Map<Integer, QueryResult<R>> getPartitionResults() {
+        return partitionResults;
+    }
+
+    /**
+     * For queries that are expected to match records in only one
+     * partition, returns the result.
+     *
+     * @throws IllegalArgumentException if the results are not for exactly one 
partition.
+     */
+    public QueryResult<R> getOnlyPartitionResult() {
+        final List<QueryResult<R>> nonempty =
+            partitionResults
+                .values()
+                .stream()
+                .filter(r -> r.getResult() != null)
+                .collect(Collectors.toList());
+
+        if (nonempty.size() != 1) {
+            throw new IllegalArgumentException(
+                "The query did not return exactly one partition result: " + 
partitionResults
+            );
+        } else {
+            return nonempty.get(0);
+        }
+    }
+
+    /**
+     * The query's result for global store queries. Is {@code null} for
+     * non-global (partitioned) store queries.
+     */
+    public QueryResult<R> getGlobalResult() {
+        return globalResult;
+    }
+
+    /**
+     * The position of the state store at the moment it executed the
+     * query. In conjunction
+     * with {@link StateQueryRequest#withPositionBound}, this can be
+     * used to achieve a good balance between consistency and
+     * availability in which repeated queries are guaranteed to
+     * advance in time while allowing reads to be served from any
+     * replica that is caught up to that caller's prior observations.
+     */
+    public Position getPosition() {
+        Position position = Position.emptyPosition();
+        for (final QueryResult<R> r : partitionResults.values()) {
+            position = position.merge(r.getPosition());
+        }
+        return position;
+    }
+
+    @Override
+    public String toString() {
+        return "StateQueryResult{" +
+            "partitionResults=" + partitionResults +
+            ", globalResult=" + globalResult +
+            '}';
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 23374f3..40f0062 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
-import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -37,8 +36,8 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.query.FailureReason;
-import org.apache.kafka.streams.query.InteractiveQueryRequest;
-import org.apache.kafka.streams.query.InteractiveQueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
 import org.apache.kafka.streams.query.Iterators;
 import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
@@ -47,8 +46,6 @@ import org.apache.kafka.streams.query.RawKeyQuery;
 import org.apache.kafka.streams.query.RawScanQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
@@ -76,7 +73,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.streams.query.InteractiveQueryRequest.inStore;
+import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
 import static org.apache.kafka.streams.query.PositionBound.at;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -189,7 +186,7 @@ public class IQv2IntegrationTest {
             kafkaStreams.serdesForStore(CACHED_TABLE);
 
         final byte[] rawKey = serdes.rawKey(1);
-        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+        final StateQueryResult<byte[]> result = kafkaStreams.query(
             inStore(CACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey)));
 
         System.out.println("|||" + result);
@@ -211,7 +208,7 @@ public class IQv2IntegrationTest {
             kafkaStreams.serdesForStore(UNCACHED_TABLE);
 
         final byte[] rawKey = serdes.rawKey(1);
-        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+        final StateQueryResult<byte[]> result = kafkaStreams.query(
             inStore(UNCACHED_TABLE).withQuery(RawKeyQuery.withKey(rawKey)));
 
         System.out.println("|||" + result);
@@ -230,10 +227,10 @@ public class IQv2IntegrationTest {
     public void shouldQueryTypedKeyFromUncachedTable() {
         final Integer key = 1;
 
-        final InteractiveQueryRequest<ValueAndTimestamp<Integer>> query =
+        final StateQueryRequest<ValueAndTimestamp<Integer>> query =
             inStore(UNCACHED_TABLE).withQuery(KeyQuery.withKey(key));
 
-        final InteractiveQueryResult<ValueAndTimestamp<Integer>> result = 
kafkaStreams.query(query);
+        final StateQueryResult<ValueAndTimestamp<Integer>> result = 
kafkaStreams.query(query);
 
         final ValueAndTimestamp<Integer> value = 
result.getOnlyPartitionResult().getResult();
 
@@ -247,10 +244,10 @@ public class IQv2IntegrationTest {
     public void exampleKeyQueryIntoWindowStore() {
         final Windowed<Integer> key = new Windowed<>(1, new TimeWindow(0L, 
99L));
 
-        final InteractiveQueryRequest<ValueAndTimestamp<Long>> query =
+        final StateQueryRequest<ValueAndTimestamp<Long>> query =
             inStore(UNCACHED_COUNTS_TABLE).withQuery(KeyQuery.withKey(key));
 
-        final InteractiveQueryResult<ValueAndTimestamp<Long>> result = 
kafkaStreams.query(query);
+        final StateQueryResult<ValueAndTimestamp<Long>> result = 
kafkaStreams.query(query);
 
         final ValueAndTimestamp<Long> value = 
result.getOnlyPartitionResult().getResult();
 
@@ -263,7 +260,7 @@ public class IQv2IntegrationTest {
         final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
             kafkaStreams.serdesForStore(UNCACHED_TABLE);
 
-        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> 
scanResult =
+        final StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
             
kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan()));
 
         System.out.println("|||" + scanResult);
@@ -294,7 +291,7 @@ public class IQv2IntegrationTest {
         final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
             kafkaStreams.serdesForStore(UNCACHED_TABLE);
 
-        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> 
scanResult =
+        final StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
             
kafkaStreams.query(inStore(UNCACHED_TABLE).withQuery(RawScanQuery.scan()));
 
         System.out.println("|||" + scanResult);
@@ -330,7 +327,7 @@ public class IQv2IntegrationTest {
             kafkaStreams.serdesForStore(UNCACHED_TABLE);
 
         final byte[] rawKey = serdes.rawKey(1);
-        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+        final StateQueryResult<byte[]> result = kafkaStreams.query(
             inStore(UNCACHED_TABLE)
                 .withQuery(RawKeyQuery.withKey(rawKey))
                 .withPositionBound(
@@ -361,7 +358,7 @@ public class IQv2IntegrationTest {
 
         final byte[] rawKey = serdes.rawKey(1);
         // intentionally setting the bound higher than the current position.
-        final InteractiveQueryResult<byte[]> result = kafkaStreams.query(
+        final StateQueryResult<byte[]> result = kafkaStreams.query(
             inStore(UNCACHED_TABLE)
                 .withQuery(RawKeyQuery.withKey(rawKey))
                 .withPositionBound(
@@ -399,7 +396,7 @@ public class IQv2IntegrationTest {
         final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
             kafkaStreams.serdesForStore(UNCACHED_TABLE);
 
-        final InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> 
scanResult =
+        final StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
             kafkaStreams.query(
                 inStore(UNCACHED_TABLE)
                     .withQuery(RawScanQuery.scan())
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java 
b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
new file mode 100644
index 0000000..41175c2
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class PositionBoundTest {
+
+    @Test
+    public void shouldCopyPosition() {
+        final Position position = Position.emptyPosition();
+        final PositionBound positionBound = PositionBound.at(position);
+        position.withComponent("topic", 1, 2L);
+
+        assertThat(position.getTopics(), equalTo(mkSet("topic")));
+        assertThat(positionBound.position().getTopics(), empty());
+    }
+
+    @Test
+    public void unboundedShouldBeUnbounded() {
+        final PositionBound bound = PositionBound.unbounded();
+        assertTrue(bound.isUnbounded());
+    }
+
+    @Test
+    public void unboundedShouldThrowOnPosition() {
+        final PositionBound bound = PositionBound.unbounded();
+        assertThrows(IllegalArgumentException.class, bound::position);
+    }
+
+    @Test
+    public void shouldEqualPosition() {
+        final PositionBound bound1 = 
PositionBound.at(Position.emptyPosition());
+        final PositionBound bound2 = 
PositionBound.at(Position.emptyPosition());
+        assertEquals(bound1, bound2);
+    }
+
+    @Test
+    public void shouldEqualUnbounded() {
+        final PositionBound bound1 = PositionBound.unbounded();
+        final PositionBound bound2 = PositionBound.unbounded();
+        assertEquals(bound1, bound2);
+    }
+
+    @Test
+    public void shouldEqualSelf() {
+        final PositionBound bound1 = 
PositionBound.at(Position.emptyPosition());
+        assertEquals(bound1, bound1);
+    }
+
+    @Test
+    public void shouldNotEqualNull() {
+        final PositionBound bound1 = 
PositionBound.at(Position.emptyPosition());
+        assertNotEquals(bound1, null);
+    }
+
+    @Test
+    public void shouldNotHash() {
+        final PositionBound bound = PositionBound.at(Position.emptyPosition());
+        assertThrows(UnsupportedOperationException.class, bound::hashCode);
+
+        // going overboard...
+        final HashSet<PositionBound> set = new HashSet<>();
+        assertThrows(UnsupportedOperationException.class, () -> 
set.add(bound));
+
+        final HashMap<PositionBound, Integer> map = new HashMap<>();
+        assertThrows(UnsupportedOperationException.class, () -> map.put(bound, 
5));
+    }
+}

Reply via email to