vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771687546



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##########
@@ -34,7 +34,7 @@
  * <p>
  */
 @Evolving
-public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
+public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {

Review comment:
       I had a review comment to add this to KeyQuery, so I added it to 
RangeQuery for exactly the same reason.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java
##########
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.query.Query;
-
-/**
- * A very simple query that all stores can handle to verify that the store is 
participating in the
- * IQv2 framework properly.
- * <p>
- * This is not a public API and may change without notice.
- */
-public class PingQuery implements Query<Boolean> {

Review comment:
       Removed, since it was only for validating the framework in the absence 
of any query implementations, and now we have query implementations.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -54,22 +55,22 @@
         );
     }
 
-    private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
+    @SuppressWarnings("rawtypes")
+    private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP =
         mkMap(
-            mkEntry(
-                PingQuery.class,
-                (query, positionBound, collectExecutionInfo, store) -> 
QueryResult.forResult(true)
-            ),

Review comment:
       Removed see the comment on the PingQuery class.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -162,15 +163,39 @@ public static boolean isPermitted(
             }
             final R result = (R) iterator;
             return QueryResult.forResult(result);
-        } catch (final Throwable t) {
-            final String message = parseStoreException(t, store, query);
+        } catch (final Exception e) {

Review comment:
       Changed from Throwable to Exception to avoid swallowing Errors

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -527,10 +541,11 @@ public void verifyStore() {
     private void globalShouldRejectAllQueries() {
         // See KAFKA-13523
 
-        final PingQuery query = new PingQuery();
-        final StateQueryRequest<Boolean> request = 
inStore(STORE_NAME).withQuery(query);
+        final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = 
KeyQuery.withKey(1);

Review comment:
       Also replaced the PingQuery here. It also doesn't affect the evaluation.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -630,29 +609,11 @@ public void shouldHandlePingQuery() {
                 .withQuery(query)
                 .withPartitions(mkSet(0, 1))
                 .withPositionBound(PositionBound.at(INPUT_POSITION));
-
         final StateQueryResult<KeyValueIterator<Integer, V>> result =
             IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
         if (result.getGlobalResult() != null) {
-            final QueryResult<KeyValueIterator<Integer, V>> queryResult = 
result.getGlobalResult();
-            final boolean failure = queryResult.isFailure();
-            if (failure) {
-                throw new AssertionError(queryResult.toString());
-            }
-            assertThat(queryResult.isSuccess(), is(true));
-
-            assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
-            assertThrows(IllegalArgumentException.class,
-                queryResult::getFailureMessage);
-
-            final KeyValueIterator<Integer, V> iterator = 
queryResult.getResult();
-            final Set<Integer> actualValue = new HashSet<>();
-            while (iterator.hasNext()) {
-                actualValue.add(valueExtactor.apply(iterator.next().value));
-            }
-            assertThat(actualValue, is(expectedValue));
-            assertThat(queryResult.getExecutionInfo(), is(empty()));
+            fail("global tables aren't implemented");

Review comment:
       This was covered in a prior PR. It will be fixed in 
https://issues.apache.org/jira/browse/KAFKA-13523

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
##########
@@ -178,26 +180,26 @@ public static void after() {
 
     @Test
     public void shouldFailUnknownStore() {
-        final PingQuery query = new PingQuery();
-        final StateQueryRequest<Boolean> request =
+        final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = 
KeyQuery.withKey(1);

Review comment:
       The query itself doesn't matter for these evaluations, so I just 
arbitrarily swapped KeyQuery in for PingQuery

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##########
@@ -52,7 +52,6 @@ private RangeQuery(final Optional<K> lower, final Optional<K> 
upper) {
      * @param upper The key that specifies the upper bound of the range
      * @param <K> The key type
      * @param <V> The value type
-     * @return An iterator of records

Review comment:
       Noticed these, which are not correct. This method returns a query, not 
an iterator of records.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -209,42 +215,45 @@ public boolean setFlushListener(final 
CacheFlushListener<K, V> listener,
                                     final PositionBound positionBound,
                                     final boolean collectExecutionInfo) {
 
-        final long start = System.nanoTime();
+        final long start = time.nanoseconds();
         final QueryResult<R> result;
 
         final QueryHandler handler = queryHandlers.get(query.getClass());
         if (handler == null) {
             result = wrapped().query(query, positionBound, 
collectExecutionInfo);
             if (collectExecutionInfo) {
                 result.addExecutionInfo(
-                        "Handled in " + getClass() + " in " + 
(System.nanoTime() - start) + "ns");
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() 
- start) + "ns");
             }
         } else {
             result = (QueryResult<R>) handler.apply(
-                    query,
-                    positionBound,
-                    collectExecutionInfo,
-                    this
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this

Review comment:
       Sorry about this. The last PR contained some bad formatting, so I'm just 
biting the bullet and fixing it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to