[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771749966 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { Review comment: Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771749813 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; +} else { +final QueryResult result = new QueryResult<>(value); Review comment: I mean, if the upper layers need `QueryResult` with proper type, and the inner layers need `QueryResult`, we should just have two properly types object, and instead of "swapping", just take the `byte[]` from `QueryResult`, deserialize them, and stuff the result into the `QueryResult` object. > but this is the API we agreed on in the KIP. I did not read the KIP :D (maybe I should have). And we can always adjust it. So me it seems useless to have a generic type parameter if we don't obey it anyway, and use casts. It's the purpose of generics to avoid casts, and if it does not avoid casts, it seems pointless to have). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: > I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too. If there is no strict reason not to throw an `IllegalStateException`, I would strongly advocate to throw. It not only guards against potential bugs, but also expresses the semantics for developers (ie, us) much cleaner and makes the code easier to read/reason about. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: > I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too. If there is no strict reason not to throw an `IllegalStateException`, I would strongly advocate to throw. It not only guards against potential bugs, but also expressed the semantics for developers (ie, us) much cleaner and make the code easier to read/reason about. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771738427 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -42,16 +102,21 @@ private StoreQueryUtils() { final int partition ) { -final QueryResult result; final long start = collectExecutionInfo ? System.nanoTime() : -1L; -if (query instanceof PingQuery) { -if (!isPermitted(position, positionBound, partition)) { -result = QueryResult.notUpToBound(position, positionBound, partition); -} else { -result = (QueryResult) QueryResult.forResult(true); -} -} else { +final QueryResult result; + +final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass()); Review comment: 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771738126 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface +public interface QueryHandler { +QueryResult apply( +final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo, +final StateStore store +); +} + + +@SuppressWarnings("unchecked") +private static final Map, QueryHandler> QUERY_HANDLER_MAP = Review comment: Not sure if I fully understand, but might be less important. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771737792 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface Review comment: I see. So we should add it elsewhere, too (of course not as part of the IQ work). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771737576 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); Review comment: > Also, because we always wrap the non-timestamped store with the KeyValueToTimestampedKeyValueByteStoreAdapter, we also always pass through the MeteredTimestampedKeyValue store whether the inner store is really timestamped or not. I don't think so. We only do this in the DSL, but not the PAPI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771737277 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); +final Serde vSerde = serdes.valueSerde(); +final Deserializer deserializer; +if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { Review comment: > The MeteredStore's serde is always a ValueAndTimestamp serde regardless of whether the inner store is Timestamped or not. Is it? (1) We also have `MeteredTimestampStore` (of course is extends `MeteredStore`) but it seems better to split the logic and move everything timestamp related into `MeteredTimestampStore`. (2) For PAPI users, they can add a plain `KeyValueStore` and we won't wrap it with the `TimestampedStore` face and the serdes won't be `ValueAndTimestamp` either. > What we do is, when you have a non-timestamped store, we wrap it with an extra layer (org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter) that pads the returned values with a fake timestamp (org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat We only do this in the DSL, if the user gives as a non-timestamped store via `Materialized` -- but for PAPI users, we never do this but use whatever store is given to use as-is. > so we did not implement the same padding logic for non-timestamped data and instead just bubble up to the MeteredStore Not sure if I can follow? It should not be a concern for IQ? Also, the current conversion between plain/timestamped is really just a corner case (and a case that we want to deprecate anyway -- we just did not find a way to do so -- maybe we should add a runtime check at some point and WARN users if they provide a non-timestamped store until we remove support for it and throw an exception instead...). Seems not worth to add more tech debt for this behavior that we only added to not break stuff. > Which means that if we want to deserialize it, we need to know whether to use the ValueAndTimestamp deserializer or just the Value's deserializer. Yes, but we should split this logic between the plain `MeteredStore` and the `MeteredTimestampStore`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771734218 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { Review comment: We can do this, but should we better say "passed down", not "handled" in the message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771733605 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; +} else { +final QueryResult result = new QueryResult<>(value); Review comment: Why not use two objects? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: > I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too. If there is no strict reason not to throw an `IllegalStateException`, I would strongly advocate to throw. It's not guards against potential bugs, but also expressed the semantics for developers (ie, us) much cleaner and make the code easier to read/reason about. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r770076908 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: Why do we not allow to swap the result if the current result has a failure? And if we don't want to allow swapping, why just return `this` but not throw an exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r770076908 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: Why do we know allow to swap the result if the current result has a failure? And if we don't want to allow swapping, why just return `this` but not throw an exception? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { Review comment: Does it make sense to add execution info for this case? We pushed the query down, but did not handle it and thus the inner store would track (and we should not track)? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); Review comment: nit: `in` -> `within` (also: maybe add `.` at the end? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedState