[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771746009 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -267,17 +281,41 @@ public boolean setFlushListener(final CacheFlushListener listener, return result; } + @SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedKeyQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = +KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final Deserializer deserializer = getValueDeserializer(); +final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult()); +final QueryResult typedQueryResult = +rawResult.swapResult(value); +result = (QueryResult) typedQueryResult; +} else { +// the generic type doesn't matter, since failed queries have no result set. +result = (QueryResult) rawResult; +} +return result; +} + +@SuppressWarnings({"unchecked", "rawtypes"}) private Deserializer getValueDeserializer() { -final Serde vSerde = serdes.valueSerde(); +final Serde valueSerde = serdes.valueSerde(); final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); final Deserializer deserializer; -if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { +if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { Review comment: I know it's weird, but it is correct. I would like to revisit it, but I think we really need to do that after the current round of queries are implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771745563 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -677,18 +638,56 @@ public void shouldHandlePingQuery() { } } +public void shouldHandleKeyQuery( +final Integer key, +final Function valueExtactor, +final Integer expectedValue) { + +final KeyQuery query = KeyQuery.withKey(key); +final StateQueryRequest request = +inStore(STORE_NAME) +.withQuery(query) +.withPartitions(mkSet(0, 1)) +.withPositionBound(PositionBound.at(INPUT_POSITION)); + +final StateQueryResult result = +IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + +final QueryResult queryResult = +result.getGlobalResult() != null Review comment: This line was written before I scratched global store support from the current scope. I'll drop the check from this test for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771742317 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -162,15 +163,39 @@ public static boolean isPermitted( } final R result = (R) iterator; return QueryResult.forResult(result); -} catch (final Throwable t) { -final String message = parseStoreException(t, store, query); +} catch (final Exception e) { +final String message = parseStoreException(e, store, query); return QueryResult.forFailure( FailureReason.STORE_EXCEPTION, message ); } } +@SuppressWarnings("unchecked") +private static QueryResult runKeyQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store) { +if (store instanceof KeyValueStore) { +final KeyQuery rawKeyQuery = (KeyQuery) query; +final KeyValueStore keyValueStore = +(KeyValueStore) store; +try { +final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); +return (QueryResult) QueryResult.forResult(bytes); Review comment: Thanks; let's keep that in mind as we tackle some of the API refactor tasks we've queued up. We started with the RawXQuery approach, then dropped it. Before we add it back, I think we'd better have a representative set of queries and also bear in mind all the other sharp edges we'd like to smooth over before release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771741301 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -253,12 +262,17 @@ public boolean setFlushListener(final CacheFlushListener listener, rawRangeQuery = RangeQuery.withNoBounds(); } final QueryResult> rawResult = -wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo); +wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo); if (rawResult.isSuccess()) { final KeyValueIterator iterator = rawResult.getResult(); final KeyValueIterator resultIterator = new MeteredKeyValueTimestampedIterator( -iterator, getSensor, getValueDeserializer()); -final QueryResult> typedQueryResult = QueryResult.forResult(resultIterator); +iterator, +getSensor, +getValueDeserializer() +); +final QueryResult> typedQueryResult = rawResult.swapResult( +resultIterator Review comment: Probably autoformatted because the line was too long. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771741132 ## File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java ## @@ -34,7 +34,7 @@ * */ @Evolving -public class RangeQuery implements Query> { +public final class RangeQuery implements Query> { Review comment: https://issues.apache.org/jira/browse/KAFKA-13554 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771739597 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); +final Serde vSerde = serdes.valueSerde(); +final Deserializer deserializer; +if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { +final ValueAndTimestampDeserializer valueAndTimestampDeserializer = +(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer(); +deserializer = (Deserializer) valueAndTimestampDeserializer.valueDeserializer; +} else { +deserializer = vSerde.deserializer(); +} Review comment: Sorry, I missed this thread before. I think these points are discussed on other threads in this PR, though. Tl;dr: I think we should aim to clean this up in https://issues.apache.org/jira/browse/KAFKA-13526 For now, I believe this logic is correct. However, it's good that you pointed out we're only testing all _dsl_ store combinations. I filed https://issues.apache.org/jira/browse/KAFKA-13553 to extend the IT to also test all _papi_ store combinations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771737653 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { Review comment: I'm getting the impression that you're not a huge fan of the phrasing of these messages. :) Can we tackle this question in a follow-on fashion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771736938 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; +} else { +final QueryResult result = new QueryResult<>(value); Review comment: Thanks, @mjsax , I'm not sure precisely what you mean. This does create a new object. If you think it would be clearer to add a constructor allowing people to set the result along with a pre-populated executionInfo and position instead, we could, but this is the API we agreed on in the KIP. I want this new API to have good ergonomics, so I do want to consider these, but I don't think we need to hold up the KeyQuery PR on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771735849 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; +} else { +final QueryResult result = new QueryResult<>(value); Review comment: Thanks, @guozhangwang , I think something like that will be the outcome of this follow-on work: https://issues.apache.org/jira/browse/KAFKA-13526 We'll tackle that question before the first release of this new API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771735440 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: sounds good! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771735231 ## File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java ## @@ -34,7 +34,7 @@ * */ @Evolving -public class RangeQuery implements Query> { +public final class RangeQuery implements Query> { Review comment: Good point. We can rename it to KeyRangeQuery in a follow-on PR. I'll file a Jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771687546 ## File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java ## @@ -34,7 +34,7 @@ * */ @Evolving -public class RangeQuery implements Query> { +public final class RangeQuery implements Query> { Review comment: I had a review comment to add this to KeyQuery, so I added it to RangeQuery for exactly the same reason. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java ## @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.streams.query.Query; - -/** - * A very simple query that all stores can handle to verify that the store is participating in the - * IQv2 framework properly. - * - * This is not a public API and may change without notice. - */ -public class PingQuery implements Query { Review comment: Removed, since it was only for validating the framework in the absence of any query implementations, and now we have query implementations. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -54,22 +55,22 @@ ); } -private static final Map, QueryHandler> QUERY_HANDLER_MAP = +@SuppressWarnings("rawtypes") +private static final Map QUERY_HANDLER_MAP = mkMap( -mkEntry( -PingQuery.class, -(query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true) -), Review comment: Removed see the comment on the PingQuery class. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -162,15 +163,39 @@ public static boolean isPermitted( } final R result = (R) iterator; return QueryResult.forResult(result); -} catch (final Throwable t) { -final String message = parseStoreException(t, store, query); +} catch (final Exception e) { Review comment: Changed from Throwable to Exception to avoid swallowing Errors ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -527,10 +541,11 @@ public void verifyStore() { private void globalShouldRejectAllQueries() { // See KAFKA-13523 -final PingQuery query = new PingQuery(); -final StateQueryRequest request = inStore(STORE_NAME).withQuery(query); +final KeyQuery> query = KeyQuery.withKey(1); Review comment: Also replaced the PingQuery here. It also doesn't affect the evaluation. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -630,29 +609,11 @@ public void shouldHandlePingQuery() { .withQuery(query) .withPartitions(mkSet(0, 1)) .withPositionBound(PositionBound.at(INPUT_POSITION)); - final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); if (result.getGlobalResult() != null) { -final QueryResult> queryResult = result.getGlobalResult(); -final boolean failure = queryResult.isFailure(); -if (failure) { -throw new AssertionError(queryResult.toString()); -} -assertThat(queryResult.isSuccess(), is(true)); - -assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); -assertThrows(IllegalArgumentException.class, -queryResult::getFailureMessage); - -final KeyValueIterator iterator = queryResult.getResult(); -final Set actualValue = new HashSet<>(); -while (iterator.hasNext()) { -actualValue.add(valueExtactor.apply(iterator.next().value)); -} -assertThat(actualValue, is(expectedValue)); -assertThat(queryResult.getExecutionInfo(), is(empty())); +
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771530728 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -42,16 +102,21 @@ private StoreQueryUtils() { final int partition ) { -final QueryResult result; final long start = collectExecutionInfo ? System.nanoTime() : -1L; -if (query instanceof PingQuery) { -if (!isPermitted(position, positionBound, partition)) { -result = QueryResult.notUpToBound(position, positionBound, partition); -} else { -result = (QueryResult) QueryResult.forResult(true); -} -} else { +final QueryResult result; + +final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass()); Review comment: Yep, that's accurate, but many of the stores will have the exact same logic as each other, so it made sense to consolidate it, which is what this util class is for. The function in the query map just checks the type of the store so that it can either cast it to execute the query or return "unknown query". That way, we can use the same dispatch map for all queries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771520958 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface +public interface QueryHandler { +QueryResult apply( +final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo, +final StateStore store +); +} + + +@SuppressWarnings("unchecked") +private static final Map, QueryHandler> QUERY_HANDLER_MAP = +mkMap( +mkEntry( +PingQuery.class, +(query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true) +), +mkEntry(KeyQuery.class, +(query, positionBound, collectExecutionInfo, store) -> { +if (store instanceof KeyValueStore) { +final KeyQuery rawKeyQuery = (KeyQuery) query; +final KeyValueStore keyValueStore = +(KeyValueStore) store; +try { +final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); +return QueryResult.forResult(bytes); +} catch (final Throwable t) { Review comment: Good point. It's fine to catch Throwables, but it's not fine to swallow Errors, as I'm doing here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771517463 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface +public interface QueryHandler { +QueryResult apply( +final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo, +final StateStore store +); +} + + +@SuppressWarnings("unchecked") +private static final Map, QueryHandler> QUERY_HANDLER_MAP = +mkMap( +mkEntry( +PingQuery.class, Review comment: Actually, the PingQuery isn't in the KIP at all. I added it (as an internal API) so that I could verify the stores work properly in the absence of any actual queries (because I implemented the framework before any real queries, to control the scope of the PR). Now that we have real queries, I don't think we need to keep Ping around. I'll remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771517463 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface +public interface QueryHandler { +QueryResult apply( +final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo, +final StateStore store +); +} + + +@SuppressWarnings("unchecked") +private static final Map, QueryHandler> QUERY_HANDLER_MAP = +mkMap( +mkEntry( +PingQuery.class, Review comment: Actually, the PingQuery isn't in the KIP at all. I added it so that I could verify the stores work properly in the absence of any actual queries (because I implemented the framework before any real queries, to control the scope of the PR). Now that we have real queries, I don't think we need to keep Ping around. I'll remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771515988 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface +public interface QueryHandler { +QueryResult apply( +final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo, +final StateStore store +); +} + + +@SuppressWarnings("unchecked") +private static final Map, QueryHandler> QUERY_HANDLER_MAP = Review comment: They both exist to dispatch query execution logic. The MeteredStores' logic is to translate results from the inner stores, and the inner stores' logic is to execute the query. Since we have a lot of functionally identical stores (i.e., many KeyValue stores, etc.), it made sense to consolidate their execution logic here instead of duplicating it in every store class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771513674 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface Review comment: The compiler is smart enough. It's just an informative annotation. Its only practical purpose is to raise compilation error if you try to declare more than one method in it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771512115 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); Review comment: I'd forgotten about MeteredTimestampedKeyValueStore, but now that I'm looking at it, what it does is extend the MeteredKeyValueStore, apparently specifically to pad the value serde with a ValueAndTimestamp serde. Otherwise, all the logic lives in MeteredKeyValueStore. Also, because we always wrap the non-timestamped store with the `KeyValueToTimestampedKeyValueByteStoreAdapter`, we also always pass through the MeteredTimestampedKeyValue store whether the inner store is really timestamped or not. I think we could clean this whole hierarchy up a bit, but it's not necessary as part of this work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771505654 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); +final Serde vSerde = serdes.valueSerde(); +final Deserializer deserializer; +if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { Review comment: Yes, this is super weird, and I think that https://issues.apache.org/jira/browse/KAFKA-13526 will give us a more elegant way to correct it, but as it stands right now, this is necessary. The MeteredStore's serde is always a ValueAndTimestamp serde regardless of whether the inner store is Timestamped or not. This works because the normal execution flow actually converts the byte results from non-timestamped stores into the binary schema of a ValueAndTimestamp. What we do is, when you have a non-timestamped store, we wrap it with an extra layer (`org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter`) that pads the returned values with a fake timestamp (`org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat`). That makes sense when the store is used by processors (particularly the ones in the DSL) because it makes the store configuration orthogonal to the processor logic, but for IQ, it's just spending extra time and memory for no productive purpose. One of the primary design goals of IQv2 is to make query execution as lean as possible, so we did not implement the same padding logic for non-timestamped data and instead just bubble up to the MeteredStore the actual byte array returned from the BytesStore. Which means that if we want to deserialize it, we need to know whether to use the ValueAndTimestamp deserializer or just the Value's deserializer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771495764 ## File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +@Evolving +public class KeyQuery implements Query { Review comment: Good idea. I'm not sure whether it will be ultimately be good to extend queries with other queries later, but it doesn't hurt to add this now so that we can make an explicit decision about it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771494393 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { Review comment: Sorry about that; oversight. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771493968 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); Review comment: Thanks; this seems about the same, and it would apply to all the other execution info messages we've got, so I think I'll keep it the same for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771492764 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { Review comment: Yeah, the idea was to actually be able to see everything that happened during query execution, specifically to demystify what's going on when you're debugging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771490884 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); Review comment: Not a bad idea! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771472556 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; +} else { +final QueryResult result = new QueryResult<>(value); Review comment: What's happening here is that we're turning a `QueryResult` into a `QueryResult`. A concrete example (in fact the only use case) of this is in the MeteredStore, we get back a raw result from the BytesStore and need to deserialize it, so we need to convert the `QueryResult` into a `QueryResult` or something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771470456 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: In the case of a failure, there is no result, just the failure message. I wanted to maintain an invariant that there is always either a failure or a result, but not both or neither. I also didn't think it would be right to allow accidentally converting a failure to a successful result via this method. I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771466959 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -29,10 +29,10 @@ */ public final class QueryResult { -private final List executionInfo = new LinkedList<>(); private final FailureReason failureReason; private final String failure; private final R result; +private List executionInfo = new LinkedList<>(); Review comment: Thanks; that would be another way to do it. I'm not sure if that would be clearly better or not, though. It's for getting more details about how the query was actually executed inside of Streams. Right now, if you request it as part of the query, each store layer will report what it did and how long it took. For runtime queries, you wouldn't want to use it, but I wanted to enable debugging if the cases where query execution seems like it's taking longer than expected. Also, it could be used for tracing, in which every Nth query is run with the execution info on. It's a list of Strings so that each store layer / operation can just add one "line" of info (like a stack trace), but we don't waste time and memory actually concatenating them with newlines. We considered adding more structure (such as having a field for execution time), but kept it as a string so as not to restrict the kind of "execution information" we might find useful to add in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771459374 ## File path: streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java ## @@ -52,5 +52,12 @@ * The requested store partition does not exist at all. For example, partition 4 was requested, * but the store in question only has 4 partitions (0 through 3). */ -DOES_NOT_EXIST; +DOES_NOT_EXIST, + +/** + * The store that handled the query got an exception during query execution. The message + * will contain the exception details. Depending on the nature of the exception, the caller + * may be able to retry this instance or may need to try a different instance. + */ +STORE_EXCEPTION; Review comment: Thanks! @guozhangwang reminded me during the discussion to make sure that all the cases in that KIP were accounted for. Some are still exceptions, and some are now FailureReasons: https://lists.apache.org/thread/brvwvpvsbsfvqpqg6jvry5hqny0vm2tr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771454005 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { Review comment: The purpose of this method is to allow `MeteredKeyValue` store to deserialize the result without wiping out the execution info or position that it got back from the bytes store. I missed that while reviewing your PR, so I went ahead and added a fix for it to this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r767856835 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -79,6 +88,14 @@ private StreamsMetricsImpl streamsMetrics; private TaskId taskId; +private Map queryHandlers = +mkMap( +mkEntry( +KeyQuery.class, +(query, positionBound, collectExecutionInfo, store) -> runKeyQuery(query, positionBound, collectExecutionInfo) +) +); + Review comment: Just trying to establish some pattern here that can let us dispatch these queries efficiently. This O(1) lookup should be faster than an O(n) if/else check or an O(log n) string switch statement, but we won't know for sure without benchmarking. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -216,9 +269,17 @@ public boolean global() { public abstract StoreSupplier supplier(); +public boolean timestamped() { +return true; // most stores are timestamped +}; + public boolean global() { return false; } + +public boolean keyValue() { +return false; +} Review comment: These help us adjust our expectations in the validations below, so that we can cover all store types in the same test. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -513,6 +590,43 @@ public void shouldHandlePingQuery() { assertThat(result.getPosition(), is(INPUT_POSITION)); } +public void shouldHandleKeyQuery( +final Integer key, +final Function valueExtactor, +final Integer expectedValue) { + +final KeyQuery query = KeyQuery.withKey(key); +final StateQueryRequest request = +inStore(STORE_NAME) +.withQuery(query) +.withPartitions(mkSet(0, 1)) +.withPositionBound(PositionBound.at(INPUT_POSITION)); + +final StateQueryResult result = +IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + +final QueryResult queryResult = +result.getGlobalResult() != null +? result.getGlobalResult() +: result.getOnlyPartitionResult(); +final boolean failure = queryResult.isFailure(); +if (failure) { +throw new AssertionError(queryResult.toString()); +} +assertThat(queryResult.isSuccess(), is(true)); + +assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); +assertThrows(IllegalArgumentException.class, +queryResult::getFailureMessage); + +final V result1 = queryResult.getResult(); +final Integer integer = valueExtactor.apply(result1); +assertThat(integer, is(expectedValue)); Review comment: Here's where we run that function to either get the value out of the ValueAndTimestamp or just give back the value with the identity function. ## File path: streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java ## @@ -52,5 +52,12 @@ * The requested store partition does not exist at all. For example, partition 4 was requested, * but the store in question only has 4 partitions (0 through 3). */ -DOES_NOT_EXIST; +DOES_NOT_EXIST, + +/** + * The store that handled the query got an exception during query execution. The message + * will contain the exception details. Depending on the nature of the exception, the caller + * may be able to retry this instance or may need to try a different instance. + */ +STORE_EXCEPTION; Review comment: I realized in the implementation for RocksDB that we will need to account for runtime exceptions from the stores. I'll update the KIP. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -426,6 +487,22 @@ public void verifyStore() { shouldHandlePingQuery(); shouldCollectExecutionInfo(); shouldCollectExecutionInfoUnderFailure(); + +if (storeToTest.keyValue()) { +if (storeToTest.timestamped()) { +shouldHandleKeyQuery( +2, +(Function, Integer>) ValueAndTimestamp::value, +2 +); +} else { +shouldHandleKeyQuery( +2, +Function.identity(), +2 +); +} +} Review comment: Here's where we use those properties. KeyQueries are only implemented for K
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r767312544 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); +final Serde vSerde = serdes.valueSerde(); +final Deserializer deserializer; +if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { +final ValueAndTimestampDeserializer valueAndTimestampDeserializer = +(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer(); +deserializer = (Deserializer) valueAndTimestampDeserializer.valueDeserializer; +} else { +deserializer = vSerde.deserializer(); +} Review comment: This is a bit of a hack. I think we can possibly do a better job of this in the Timestamped facade that we insert in front of non-timestamped stores, but I don't want to over-engineer it until we have a few different query types in the code base to make sure that what we do works in general. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org