vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771687546
########## File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java ########## @@ -34,7 +34,7 @@ * <p> */ @Evolving -public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { +public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { Review comment: I had a review comment to add this to KeyQuery, so I added it to RangeQuery for exactly the same reason. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java ########## @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.streams.query.Query; - -/** - * A very simple query that all stores can handle to verify that the store is participating in the - * IQv2 framework properly. - * <p> - * This is not a public API and may change without notice. - */ -public class PingQuery implements Query<Boolean> { Review comment: Removed, since it was only for validating the framework in the absence of any query implementations, and now we have query implementations. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ########## @@ -54,22 +55,22 @@ ); } - private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP = + @SuppressWarnings("rawtypes") + private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP = mkMap( - mkEntry( - PingQuery.class, - (query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true) - ), Review comment: Removed see the comment on the PingQuery class. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ########## @@ -162,15 +163,39 @@ public static boolean isPermitted( } final R result = (R) iterator; return QueryResult.forResult(result); - } catch (final Throwable t) { - final String message = parseStoreException(t, store, query); + } catch (final Exception e) { Review comment: Changed from Throwable to Exception to avoid swallowing Errors ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ########## @@ -527,10 +541,11 @@ public void verifyStore() { private void globalShouldRejectAllQueries() { // See KAFKA-13523 - final PingQuery query = new PingQuery(); - final StateQueryRequest<Boolean> request = inStore(STORE_NAME).withQuery(query); + final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1); Review comment: Also replaced the PingQuery here. It also doesn't affect the evaluation. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ########## @@ -630,29 +609,11 @@ public void shouldHandlePingQuery() { .withQuery(query) .withPartitions(mkSet(0, 1)) .withPositionBound(PositionBound.at(INPUT_POSITION)); - final StateQueryResult<KeyValueIterator<Integer, V>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); if (result.getGlobalResult() != null) { - final QueryResult<KeyValueIterator<Integer, V>> queryResult = result.getGlobalResult(); - final boolean failure = queryResult.isFailure(); - if (failure) { - throw new AssertionError(queryResult.toString()); - } - assertThat(queryResult.isSuccess(), is(true)); - - assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); - assertThrows(IllegalArgumentException.class, - queryResult::getFailureMessage); - - final KeyValueIterator<Integer, V> iterator = queryResult.getResult(); - final Set<Integer> actualValue = new HashSet<>(); - while (iterator.hasNext()) { - actualValue.add(valueExtactor.apply(iterator.next().value)); - } - assertThat(actualValue, is(expectedValue)); - assertThat(queryResult.getExecutionInfo(), is(empty())); + fail("global tables aren't implemented"); Review comment: This was covered in a prior PR. It will be fixed in https://issues.apache.org/jira/browse/KAFKA-13523 ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java ########## @@ -178,26 +180,26 @@ public static void after() { @Test public void shouldFailUnknownStore() { - final PingQuery query = new PingQuery(); - final StateQueryRequest<Boolean> request = + final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1); Review comment: The query itself doesn't matter for these evaluations, so I just arbitrarily swapped KeyQuery in for PingQuery ########## File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java ########## @@ -52,7 +52,6 @@ private RangeQuery(final Optional<K> lower, final Optional<K> upper) { * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type - * @return An iterator of records Review comment: Noticed these, which are not correct. This method returns a query, not an iterator of records. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -209,42 +215,45 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener, final PositionBound positionBound, final boolean collectExecutionInfo) { - final long start = System.nanoTime(); + final long start = time.nanoseconds(); final QueryResult<R> result; final QueryHandler handler = queryHandlers.get(query.getClass()); if (handler == null) { result = wrapped().query(query, positionBound, collectExecutionInfo); if (collectExecutionInfo) { result.addExecutionInfo( - "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); } } else { result = (QueryResult<R>) handler.apply( - query, - positionBound, - collectExecutionInfo, - this + query, + positionBound, + collectExecutionInfo, + this Review comment: Sorry about this. The last PR contained some bad formatting, so I'm just biting the bullet and fixing it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org