vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r767856835



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -79,6 +88,14 @@
     private StreamsMetricsImpl streamsMetrics;
     private TaskId taskId;
 
+    private  Map<Class, QueryHandler> queryHandlers =
+        mkMap(
+            mkEntry(
+                KeyQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> 
runKeyQuery(query, positionBound, collectExecutionInfo)
+            )
+        );
+

Review comment:
       Just trying to establish some pattern here that can let us dispatch 
these queries efficiently. This O(1) lookup should be faster than an O(n) 
if/else check or an O(log n) string switch statement, but we won't know for 
sure without benchmarking.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -216,9 +269,17 @@ public boolean global() {
 
         public abstract StoreSupplier<?> supplier();
 
+        public boolean timestamped() {
+            return true; // most stores are timestamped
+        };
+
         public boolean global() {
             return false;
         }
+
+        public boolean keyValue() {
+            return false;
+        }

Review comment:
       These help us adjust our expectations in the validations below, so that 
we can cover all store types in the same test.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -513,6 +590,43 @@ public void shouldHandlePingQuery() {
         assertThat(result.getPosition(), is(INPUT_POSITION));
     }
 
+    public <V> void shouldHandleKeyQuery(
+        final Integer key,
+        final Function<V, Integer> valueExtactor,
+        final Integer expectedValue) {
+
+        final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
+        final StateQueryRequest<V> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<V> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        final QueryResult<V> queryResult =
+            result.getGlobalResult() != null
+                ? result.getGlobalResult()
+                : result.getOnlyPartitionResult();
+        final boolean failure = queryResult.isFailure();
+        if (failure) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class,
+            queryResult::getFailureMessage);
+
+        final V result1 = queryResult.getResult();
+        final Integer integer = valueExtactor.apply(result1);
+        assertThat(integer, is(expectedValue));

Review comment:
       Here's where we run that function to either get the value out of the 
ValueAndTimestamp or just give back the value with the identity function.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
##########
@@ -52,5 +52,12 @@
      * The requested store partition does not exist at all. For example, 
partition 4 was requested,
      * but the store in question only has 4 partitions (0 through 3).
      */
-    DOES_NOT_EXIST;
+    DOES_NOT_EXIST,
+
+    /**
+     * The store that handled the query got an exception during query 
execution. The message
+     * will contain the exception details. Depending on the nature of the 
exception, the caller
+     * may be able to retry this instance or may need to try a different 
instance.
+     */
+    STORE_EXCEPTION;

Review comment:
       I realized in the implementation for RocksDB that we will need to 
account for runtime exceptions from the stores. I'll update the KIP.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -426,6 +487,22 @@ public void verifyStore() {
             shouldHandlePingQuery();
             shouldCollectExecutionInfo();
             shouldCollectExecutionInfoUnderFailure();
+
+            if (storeToTest.keyValue()) {
+                if (storeToTest.timestamped()) {
+                    shouldHandleKeyQuery(
+                        2,
+                        (Function<ValueAndTimestamp<Integer>, Integer>) 
ValueAndTimestamp::value,
+                        2
+                    );
+                } else {
+                    shouldHandleKeyQuery(
+                        2,
+                        Function.identity(),
+                        2
+                    );
+                }
+            }

Review comment:
       Here's where we use those properties. KeyQueries are only implemented 
for KeyValue stores. For Timestamped stores, we get back a ValueAndTimestamp, 
which we extract the value from before making the assertion. Otherwise, we just 
get back the value and can assert directly on it.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener<K, V> listener,
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + 
"ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+                final ValueAndTimestampDeserializer 
valueAndTimestampDeserializer =
+                    (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) 
vSerde).deserializer();
+                deserializer = (Deserializer<V>) 
valueAndTimestampDeserializer.valueDeserializer;
+            } else {
+                deserializer = vSerde.deserializer();
+            }

Review comment:
       Note for the reviewers in case this is mysterious: We support one store, 
the non-timestamped RocksDB store (`Stores.persistentKeyValueStore`), which 
does not return a ValueAndTimestamp tuple, but just a raw value. But Streams 
has a store layer that inserts dummy timestamps to make everything conform to 
the same types. It's counter to the goals of IQv2 to spend time and memory 
copying the result arrays to add the timestamp only to strip it off again later 
(as in IQv1), so instead we just pass through the non-timestamped value from 
the bytes store and strip off the ValueAndTimestamp serde so we can deserialize 
the raw value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to