vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r767856835
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -79,6 +88,14 @@ private StreamsMetricsImpl streamsMetrics; private TaskId taskId; + private Map<Class, QueryHandler> queryHandlers = + mkMap( + mkEntry( + KeyQuery.class, + (query, positionBound, collectExecutionInfo, store) -> runKeyQuery(query, positionBound, collectExecutionInfo) + ) + ); + Review comment: Just trying to establish some pattern here that can let us dispatch these queries efficiently. This O(1) lookup should be faster than an O(n) if/else check or an O(log n) string switch statement, but we won't know for sure without benchmarking. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ########## @@ -216,9 +269,17 @@ public boolean global() { public abstract StoreSupplier<?> supplier(); + public boolean timestamped() { + return true; // most stores are timestamped + }; + public boolean global() { return false; } + + public boolean keyValue() { + return false; + } Review comment: These help us adjust our expectations in the validations below, so that we can cover all store types in the same test. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ########## @@ -513,6 +590,43 @@ public void shouldHandlePingQuery() { assertThat(result.getPosition(), is(INPUT_POSITION)); } + public <V> void shouldHandleKeyQuery( + final Integer key, + final Function<V, Integer> valueExtactor, + final Integer expectedValue) { + + final KeyQuery<Integer, V> query = KeyQuery.withKey(key); + final StateQueryRequest<V> request = + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); + + final StateQueryResult<V> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + + final QueryResult<V> queryResult = + result.getGlobalResult() != null + ? result.getGlobalResult() + : result.getOnlyPartitionResult(); + final boolean failure = queryResult.isFailure(); + if (failure) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows(IllegalArgumentException.class, + queryResult::getFailureMessage); + + final V result1 = queryResult.getResult(); + final Integer integer = valueExtactor.apply(result1); + assertThat(integer, is(expectedValue)); Review comment: Here's where we run that function to either get the value out of the ValueAndTimestamp or just give back the value with the identity function. ########## File path: streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java ########## @@ -52,5 +52,12 @@ * The requested store partition does not exist at all. For example, partition 4 was requested, * but the store in question only has 4 partitions (0 through 3). */ - DOES_NOT_EXIST; + DOES_NOT_EXIST, + + /** + * The store that handled the query got an exception during query execution. The message + * will contain the exception details. Depending on the nature of the exception, the caller + * may be able to retry this instance or may need to try a different instance. + */ + STORE_EXCEPTION; Review comment: I realized in the implementation for RocksDB that we will need to account for runtime exceptions from the stores. I'll update the KIP. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ########## @@ -426,6 +487,22 @@ public void verifyStore() { shouldHandlePingQuery(); shouldCollectExecutionInfo(); shouldCollectExecutionInfoUnderFailure(); + + if (storeToTest.keyValue()) { + if (storeToTest.timestamped()) { + shouldHandleKeyQuery( + 2, + (Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value, + 2 + ); + } else { + shouldHandleKeyQuery( + 2, + Function.identity(), + 2 + ); + } + } Review comment: Here's where we use those properties. KeyQueries are only implemented for KeyValue stores. For Timestamped stores, we get back a ValueAndTimestamp, which we extract the value from before making the assertion. Otherwise, we just get back the value and can assert directly on it. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener, return false; } + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + + final long start = System.nanoTime(); + final QueryResult<R> result; + + final QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, collectExecutionInfo); + if (collectExecutionInfo) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + collectExecutionInfo, + this + ); + if (collectExecutionInfo) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (System.nanoTime() - start) + "ns"); + } + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runKeyQuery(final Query query, + final PositionBound positionBound, final boolean collectExecutionInfo) { + final QueryResult<R> result; + final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); + if (rawResult.isSuccess()) { + final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); + final Serde<V> vSerde = serdes.valueSerde(); + final Deserializer<V> deserializer; + if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { + final ValueAndTimestampDeserializer valueAndTimestampDeserializer = + (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer(); + deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer; + } else { + deserializer = vSerde.deserializer(); + } Review comment: Note for the reviewers in case this is mysterious: We support one store, the non-timestamped RocksDB store (`Stores.persistentKeyValueStore`), which does not return a ValueAndTimestamp tuple, but just a raw value. But Streams has a store layer that inserts dummy timestamps to make everything conform to the same types. It's counter to the goals of IQv2 to spend time and memory copying the result arrays to add the timestamp only to strip it off again later (as in IQv1), so instead we just pass through the non-timestamped value from the bytes store and strip off the ValueAndTimestamp serde so we can deserialize the raw value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org