[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771746009



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -267,17 +281,41 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return result;
 }
 
+
 @SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+   final PositionBound positionBound,
+   final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedKeyQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery =
+KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final Deserializer deserializer = getValueDeserializer();
+final V value = deserializer.deserialize(serdes.topic(), 
rawResult.getResult());
+final QueryResult typedQueryResult =
+rawResult.swapResult(value);
+result = (QueryResult) typedQueryResult;
+} else {
+// the generic type doesn't matter, since failed queries have no 
result set.
+result = (QueryResult) rawResult;
+}
+return result;
+}
+
+@SuppressWarnings({"unchecked", "rawtypes"})
 private Deserializer getValueDeserializer() {
-final Serde vSerde = serdes.valueSerde();
+final Serde valueSerde = serdes.valueSerde();
 final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
 final Deserializer deserializer;
-if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review comment:
   I know it's weird, but it is correct. I would like to revisit it, but I 
think we really need to do that after the current round of queries are 
implemented.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771745563



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -677,18 +638,56 @@ public void shouldHandlePingQuery() {
 }
 }
 
+public  void shouldHandleKeyQuery(
+final Integer key,
+final Function valueExtactor,
+final Integer expectedValue) {
+
+final KeyQuery query = KeyQuery.withKey(key);
+final StateQueryRequest request =
+inStore(STORE_NAME)
+.withQuery(query)
+.withPartitions(mkSet(0, 1))
+.withPositionBound(PositionBound.at(INPUT_POSITION));
+
+final StateQueryResult result =
+IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+final QueryResult queryResult =
+result.getGlobalResult() != null

Review comment:
   This line was written before I scratched global store support from the 
current scope. I'll drop the check from this test for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771742317



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -162,15 +163,39 @@ public static boolean isPermitted(
 }
 final R result = (R) iterator;
 return QueryResult.forResult(result);
-} catch (final Throwable t) {
-final String message = parseStoreException(t, store, query);
+} catch (final Exception e) {
+final String message = parseStoreException(e, store, query);
 return QueryResult.forFailure(
 FailureReason.STORE_EXCEPTION,
 message
 );
 }
 }
 
+@SuppressWarnings("unchecked")
+private static  QueryResult runKeyQuery(final Query query,
+  final PositionBound 
positionBound,
+  final boolean 
collectExecutionInfo,
+  final StateStore store) {
+if (store instanceof KeyValueStore) {
+final KeyQuery rawKeyQuery = (KeyQuery) query;
+final KeyValueStore keyValueStore =
+(KeyValueStore) store;
+try {
+final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+return (QueryResult) QueryResult.forResult(bytes);

Review comment:
   Thanks; let's keep that in mind as we tackle some of the API refactor 
tasks we've queued up. We started with the RawXQuery approach, then dropped it. 
Before we add it back, I think we'd better have a representative set of queries 
and also bear in mind all the other sharp edges we'd like to smooth over before 
release.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771741301



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -253,12 +262,17 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 rawRangeQuery = RangeQuery.withNoBounds();
 }
 final QueryResult> rawResult =
-wrapped().query(rawRangeQuery, positionBound, 
collectExecutionInfo);
+wrapped().query(rawRangeQuery, positionBound, 
collectExecutionInfo);
 if (rawResult.isSuccess()) {
 final KeyValueIterator iterator = 
rawResult.getResult();
 final KeyValueIterator resultIterator = new 
MeteredKeyValueTimestampedIterator(
-iterator, getSensor, getValueDeserializer());
-final QueryResult> typedQueryResult = 
QueryResult.forResult(resultIterator);
+iterator,
+getSensor,
+getValueDeserializer()
+);
+final QueryResult> typedQueryResult = 
rawResult.swapResult(
+resultIterator

Review comment:
   Probably autoformatted because the line was too long.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771741132



##
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##
@@ -34,7 +34,7 @@
  * 
  */
 @Evolving
-public class RangeQuery implements Query> {
+public final class RangeQuery implements Query> {

Review comment:
   https://issues.apache.org/jira/browse/KAFKA-13554




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771739597



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());
+final Serde vSerde = serdes.valueSerde();
+final Deserializer deserializer;
+if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+final ValueAndTimestampDeserializer 
valueAndTimestampDeserializer =
+(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) 
vSerde).deserializer();
+deserializer = (Deserializer) 
valueAndTimestampDeserializer.valueDeserializer;
+} else {
+deserializer = vSerde.deserializer();
+}

Review comment:
   Sorry, I missed this thread before. I think these points are discussed 
on other threads in this PR, though. Tl;dr: I think we should aim to clean this 
up in https://issues.apache.org/jira/browse/KAFKA-13526
   
   For now, I believe this logic is correct. However, it's good that you 
pointed out we're only testing all _dsl_ store combinations. I filed 
https://issues.apache.org/jira/browse/KAFKA-13553 to extend the IT to also test 
all _papi_ store combinations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737653



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {

Review comment:
   I'm getting the impression that you're not a huge fan of the phrasing of 
these messages. :) Can we tackle this question in a follow-on fashion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771736938



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;
+} else {
+final QueryResult result = new QueryResult<>(value);

Review comment:
   Thanks, @mjsax , I'm not sure precisely what you mean. This does create 
a new object. If you think it would be clearer to add a constructor allowing 
people to set the result along with a pre-populated executionInfo and position 
instead, we could, but this is the API we agreed on in the KIP.
   
   I want this new API to have good ergonomics, so I do want to consider these, 
but I don't think we need to hold up the KeyQuery PR on it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771735849



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;
+} else {
+final QueryResult result = new QueryResult<>(value);

Review comment:
   Thanks, @guozhangwang , I think something like that will be the outcome 
of this follow-on work: https://issues.apache.org/jira/browse/KAFKA-13526
   
   We'll tackle that question before the first release of this new API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771735440



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   sounds good!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771735231



##
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##
@@ -34,7 +34,7 @@
  * 
  */
 @Evolving
-public class RangeQuery implements Query> {
+public final class RangeQuery implements Query> {

Review comment:
   Good point. We can rename it to KeyRangeQuery in a follow-on PR. I'll 
file a Jira.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771687546



##
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##
@@ -34,7 +34,7 @@
  * 
  */
 @Evolving
-public class RangeQuery implements Query> {
+public final class RangeQuery implements Query> {

Review comment:
   I had a review comment to add this to KeyQuery, so I added it to 
RangeQuery for exactly the same reason.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java
##
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.query.Query;
-
-/**
- * A very simple query that all stores can handle to verify that the store is 
participating in the
- * IQv2 framework properly.
- * 
- * This is not a public API and may change without notice.
- */
-public class PingQuery implements Query {

Review comment:
   Removed, since it was only for validating the framework in the absence 
of any query implementations, and now we have query implementations.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -54,22 +55,22 @@
 );
 }
 
-private static final Map, QueryHandler> QUERY_HANDLER_MAP =
+@SuppressWarnings("rawtypes")
+private static final Map QUERY_HANDLER_MAP =
 mkMap(
-mkEntry(
-PingQuery.class,
-(query, positionBound, collectExecutionInfo, store) -> 
QueryResult.forResult(true)
-),

Review comment:
   Removed see the comment on the PingQuery class.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -162,15 +163,39 @@ public static boolean isPermitted(
 }
 final R result = (R) iterator;
 return QueryResult.forResult(result);
-} catch (final Throwable t) {
-final String message = parseStoreException(t, store, query);
+} catch (final Exception e) {

Review comment:
   Changed from Throwable to Exception to avoid swallowing Errors

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -527,10 +541,11 @@ public void verifyStore() {
 private void globalShouldRejectAllQueries() {
 // See KAFKA-13523
 
-final PingQuery query = new PingQuery();
-final StateQueryRequest request = 
inStore(STORE_NAME).withQuery(query);
+final KeyQuery> query = 
KeyQuery.withKey(1);

Review comment:
   Also replaced the PingQuery here. It also doesn't affect the evaluation.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -630,29 +609,11 @@ public void shouldHandlePingQuery() {
 .withQuery(query)
 .withPartitions(mkSet(0, 1))
 .withPositionBound(PositionBound.at(INPUT_POSITION));
-
 final StateQueryResult> result =
 IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
 if (result.getGlobalResult() != null) {
-final QueryResult> queryResult = 
result.getGlobalResult();
-final boolean failure = queryResult.isFailure();
-if (failure) {
-throw new AssertionError(queryResult.toString());
-}
-assertThat(queryResult.isSuccess(), is(true));
-
-assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
-assertThrows(IllegalArgumentException.class,
-queryResult::getFailureMessage);
-
-final KeyValueIterator iterator = 
queryResult.getResult();
-final Set actualValue = new HashSet<>();
-while (iterator.hasNext()) {
-actualValue.add(valueExtactor.apply(iterator.next().value));
-}
-assertThat(actualValue, is(expectedValue));
-assertThat(queryResult.getExecutionInfo(), is(empty()));
+   

[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771530728



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -42,16 +102,21 @@ private StoreQueryUtils() {
 final int partition
 ) {
 
-final QueryResult result;
 final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-if (query instanceof PingQuery) {
-if (!isPermitted(position, positionBound, partition)) {
-result = QueryResult.notUpToBound(position, positionBound, 
partition);
-} else {
-result = (QueryResult) QueryResult.forResult(true);
-}
-} else {
+final QueryResult result;
+
+final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());

Review comment:
   Yep, that's accurate, but many of the stores will have the exact same 
logic as each other, so it made sense to consolidate it, which is what this 
util class is for.
   
   The function in the query map just checks the type of the store so that it 
can either cast it to execute the query or return "unknown query". That way, we 
can use the same dispatch map for all queries.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771520958



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface
+public interface QueryHandler {
+QueryResult apply(
+final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo,
+final StateStore store
+);
+}
+
+
+@SuppressWarnings("unchecked")
+private static final Map, QueryHandler> QUERY_HANDLER_MAP =
+mkMap(
+mkEntry(
+PingQuery.class,
+(query, positionBound, collectExecutionInfo, store) -> 
QueryResult.forResult(true)
+),
+mkEntry(KeyQuery.class,
+(query, positionBound, collectExecutionInfo, store) -> {
+if (store instanceof KeyValueStore) {
+final KeyQuery rawKeyQuery = 
(KeyQuery) query;
+final KeyValueStore keyValueStore =
+(KeyValueStore) store;
+try {
+final byte[] bytes = 
keyValueStore.get(rawKeyQuery.getKey());
+return QueryResult.forResult(bytes);
+} catch (final Throwable t) {

Review comment:
   Good point. It's fine to catch Throwables, but it's not fine to swallow 
Errors, as I'm doing here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771517463



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface
+public interface QueryHandler {
+QueryResult apply(
+final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo,
+final StateStore store
+);
+}
+
+
+@SuppressWarnings("unchecked")
+private static final Map, QueryHandler> QUERY_HANDLER_MAP =
+mkMap(
+mkEntry(
+PingQuery.class,

Review comment:
   Actually, the PingQuery isn't in the KIP at all. I added it (as an 
internal API) so that I could verify the stores work properly in the absence of 
any actual queries (because I implemented the framework before any real 
queries, to control the scope of the PR).
   
   Now that we have real queries, I don't think we need to keep Ping around. 
I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771517463



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface
+public interface QueryHandler {
+QueryResult apply(
+final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo,
+final StateStore store
+);
+}
+
+
+@SuppressWarnings("unchecked")
+private static final Map, QueryHandler> QUERY_HANDLER_MAP =
+mkMap(
+mkEntry(
+PingQuery.class,

Review comment:
   Actually, the PingQuery isn't in the KIP at all. I added it so that I 
could verify the stores work properly in the absence of any actual queries 
(because I implemented the framework before any real queries, to control the 
scope of the PR).
   
   Now that we have real queries, I don't think we need to keep Ping around. 
I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771515988



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface
+public interface QueryHandler {
+QueryResult apply(
+final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo,
+final StateStore store
+);
+}
+
+
+@SuppressWarnings("unchecked")
+private static final Map, QueryHandler> QUERY_HANDLER_MAP =

Review comment:
   They both exist to dispatch query execution logic. The MeteredStores' 
logic is to translate results from the inner stores, and the inner stores' 
logic is to execute the query. Since we have a lot of functionally identical 
stores (i.e., many KeyValue stores, etc.), it made sense to consolidate their 
execution logic here instead of duplicating it in every store class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771513674



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface

Review comment:
   The compiler is smart enough. It's just an informative annotation. Its 
only practical purpose is to raise compilation error if you try to declare more 
than one method in it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771512115



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());

Review comment:
   I'd forgotten about MeteredTimestampedKeyValueStore, but now that I'm 
looking at it, what it does is extend the MeteredKeyValueStore, apparently 
specifically to pad the value serde with a ValueAndTimestamp serde. Otherwise, 
all the logic lives in MeteredKeyValueStore.
   
   Also, because we always wrap the non-timestamped store with the 
`KeyValueToTimestampedKeyValueByteStoreAdapter`, we also always pass through 
the MeteredTimestampedKeyValue store whether the inner store is really 
timestamped or not.
   
   I think we could clean this whole hierarchy up a bit, but it's not necessary 
as part of this work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771505654



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());
+final Serde vSerde = serdes.valueSerde();
+final Deserializer deserializer;
+if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {

Review comment:
   Yes, this is super weird, and I think that 
https://issues.apache.org/jira/browse/KAFKA-13526 will give us a more elegant 
way to correct it, but as it stands right now, this is necessary.
   
   The MeteredStore's serde is always a ValueAndTimestamp serde regardless of 
whether the inner store is Timestamped or not. This works because the normal 
execution flow actually converts the byte results from non-timestamped stores 
into the binary schema of a ValueAndTimestamp.
   
   What we do is, when you have a non-timestamped store, we wrap it with an 
extra layer 
(`org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter`)
 that pads the returned values with a fake timestamp 
(`org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat`).
 That makes sense when the store is used by processors (particularly the ones 
in the DSL) because it makes the store configuration orthogonal to the 
processor logic, but for IQ, it's just spending extra time and memory for no 
productive purpose.
   
   One of the primary design goals of IQv2 is to make query execution as lean 
as possible, so we did not implement the same padding logic for non-timestamped 
data and instead just bubble up to the MeteredStore the actual byte array 
returned from the BytesStore. Which means that if we want to deserialize it, we 
need to know whether to use the ValueAndTimestamp deserializer or just the 
Value's deserializer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771495764



##
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+@Evolving
+public class KeyQuery implements Query {

Review comment:
   Good idea. I'm not sure whether it will be ultimately be good to extend 
queries with other queries later, but it doesn't hurt to add this now so that 
we can make an explicit decision about it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771494393



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{

Review comment:
   Sorry about that; oversight.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771493968



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");

Review comment:
   Thanks; this seems about the same, and it would apply to all the other 
execution info messages we've got, so I think I'll keep it the same for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771492764



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {

Review comment:
   Yeah, the idea was to actually be able to see everything that happened 
during query execution, specifically to demystify what's going on when you're 
debugging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771490884



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();

Review comment:
   Not a bad idea!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771472556



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;
+} else {
+final QueryResult result = new QueryResult<>(value);

Review comment:
   What's happening here is that we're turning a `QueryResult` into a 
`QueryResult`. A concrete example (in fact the only use case) of this is in 
the MeteredStore, we get back a raw result from the BytesStore and need to 
deserialize it, so we need to convert the `QueryResult` into a 
`QueryResult` or something.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771470456



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   In the case of a failure, there is no result, just the failure message. 
I wanted to maintain an invariant that there is always either a failure or a 
result, but not both or neither. I also didn't think it would be right to allow 
accidentally converting a failure to a successful result via this method.
   
   I suppose we could throw an IllegalStateException, since the caller probably 
shouldn't be even trying to "swap" the result on a failed result to begin with, 
but this seems fine, too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771466959



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -29,10 +29,10 @@
  */
 public final class QueryResult {
 
-private final List executionInfo = new LinkedList<>();
 private final FailureReason failureReason;
 private final String failure;
 private final R result;
+private List executionInfo = new LinkedList<>();

Review comment:
   Thanks; that would be another way to do it. I'm not sure if that would 
be clearly better or not, though.
   
   It's for getting more details about how the query was actually executed 
inside of Streams. Right now, if you request it as part of the query, each 
store layer will report what it did and how long it took. For runtime queries, 
you wouldn't want to use it, but I wanted to enable debugging if the cases 
where query execution seems like it's taking longer than expected. Also, it 
could be used for tracing, in which every Nth query is run with the execution 
info on.
   
   It's a list of Strings so that each store layer / operation can just add one 
"line" of info (like a stack trace), but we don't waste time and memory 
actually concatenating them with newlines. We considered adding more structure 
(such as having a field for execution time), but kept it as a string so as not 
to restrict the kind of "execution information" we might find useful to add in 
the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771459374



##
File path: 
streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
##
@@ -52,5 +52,12 @@
  * The requested store partition does not exist at all. For example, 
partition 4 was requested,
  * but the store in question only has 4 partitions (0 through 3).
  */
-DOES_NOT_EXIST;
+DOES_NOT_EXIST,
+
+/**
+ * The store that handled the query got an exception during query 
execution. The message
+ * will contain the exception details. Depending on the nature of the 
exception, the caller
+ * may be able to retry this instance or may need to try a different 
instance.
+ */
+STORE_EXCEPTION;

Review comment:
   Thanks! @guozhangwang reminded me during the discussion to make sure 
that all the cases in that KIP were accounted for. Some are still exceptions, 
and some are now FailureReasons: 
https://lists.apache.org/thread/brvwvpvsbsfvqpqg6jvry5hqny0vm2tr




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771454005



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {

Review comment:
   The purpose of this method is to allow `MeteredKeyValue` store to 
deserialize the result without wiping out the execution info or position that 
it got back from the bytes store. I missed that while reviewing your PR, so I 
went ahead and added a fix for it to this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-13 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r767856835



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -79,6 +88,14 @@
 private StreamsMetricsImpl streamsMetrics;
 private TaskId taskId;
 
+private  Map queryHandlers =
+mkMap(
+mkEntry(
+KeyQuery.class,
+(query, positionBound, collectExecutionInfo, store) -> 
runKeyQuery(query, positionBound, collectExecutionInfo)
+)
+);
+

Review comment:
   Just trying to establish some pattern here that can let us dispatch 
these queries efficiently. This O(1) lookup should be faster than an O(n) 
if/else check or an O(log n) string switch statement, but we won't know for 
sure without benchmarking.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -216,9 +269,17 @@ public boolean global() {
 
 public abstract StoreSupplier supplier();
 
+public boolean timestamped() {
+return true; // most stores are timestamped
+};
+
 public boolean global() {
 return false;
 }
+
+public boolean keyValue() {
+return false;
+}

Review comment:
   These help us adjust our expectations in the validations below, so that 
we can cover all store types in the same test.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -513,6 +590,43 @@ public void shouldHandlePingQuery() {
 assertThat(result.getPosition(), is(INPUT_POSITION));
 }
 
+public  void shouldHandleKeyQuery(
+final Integer key,
+final Function valueExtactor,
+final Integer expectedValue) {
+
+final KeyQuery query = KeyQuery.withKey(key);
+final StateQueryRequest request =
+inStore(STORE_NAME)
+.withQuery(query)
+.withPartitions(mkSet(0, 1))
+.withPositionBound(PositionBound.at(INPUT_POSITION));
+
+final StateQueryResult result =
+IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+final QueryResult queryResult =
+result.getGlobalResult() != null
+? result.getGlobalResult()
+: result.getOnlyPartitionResult();
+final boolean failure = queryResult.isFailure();
+if (failure) {
+throw new AssertionError(queryResult.toString());
+}
+assertThat(queryResult.isSuccess(), is(true));
+
+assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+assertThrows(IllegalArgumentException.class,
+queryResult::getFailureMessage);
+
+final V result1 = queryResult.getResult();
+final Integer integer = valueExtactor.apply(result1);
+assertThat(integer, is(expectedValue));

Review comment:
   Here's where we run that function to either get the value out of the 
ValueAndTimestamp or just give back the value with the identity function.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
##
@@ -52,5 +52,12 @@
  * The requested store partition does not exist at all. For example, 
partition 4 was requested,
  * but the store in question only has 4 partitions (0 through 3).
  */
-DOES_NOT_EXIST;
+DOES_NOT_EXIST,
+
+/**
+ * The store that handled the query got an exception during query 
execution. The message
+ * will contain the exception details. Depending on the nature of the 
exception, the caller
+ * may be able to retry this instance or may need to try a different 
instance.
+ */
+STORE_EXCEPTION;

Review comment:
   I realized in the implementation for RocksDB that we will need to 
account for runtime exceptions from the stores. I'll update the KIP.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -426,6 +487,22 @@ public void verifyStore() {
 shouldHandlePingQuery();
 shouldCollectExecutionInfo();
 shouldCollectExecutionInfoUnderFailure();
+
+if (storeToTest.keyValue()) {
+if (storeToTest.timestamped()) {
+shouldHandleKeyQuery(
+2,
+(Function, Integer>) 
ValueAndTimestamp::value,
+2
+);
+} else {
+shouldHandleKeyQuery(
+2,
+Function.identity(),
+2
+);
+}
+}

Review comment:
   Here's where we use those properties. KeyQueries are only implemented 
for K

[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-12 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r767312544



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());
+final Serde vSerde = serdes.valueSerde();
+final Deserializer deserializer;
+if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+final ValueAndTimestampDeserializer 
valueAndTimestampDeserializer =
+(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) 
vSerde).deserializer();
+deserializer = (Deserializer) 
valueAndTimestampDeserializer.valueDeserializer;
+} else {
+deserializer = vSerde.deserializer();
+}

Review comment:
   This is a bit of a hack. I think we can possibly do a better job of this 
in the Timestamped facade that we insert in front of non-timestamped stores, 
but I don't want to over-engineer it until we have a few different query types 
in the code base to make sure that what we do works in general.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org