mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737277
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -186,6 +203,68 @@ public boolean setFlushListener(final
CacheFlushListener<K, V> listener,
return false;
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
+ final long start = System.nanoTime();
+ final QueryResult<R> result;
+
+ final QueryHandler handler = queryHandlers.get(query.getClass());
+ if (handler == null) {
+ result = wrapped().query(query, positionBound,
collectExecutionInfo);
+ if (collectExecutionInfo) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (System.nanoTime() -
start) + "ns");
+ }
+ } else {
+ result = (QueryResult<R>) handler.apply(
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this
+ );
+ if (collectExecutionInfo) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " with serdes "
+ + serdes + " in " + (System.nanoTime() - start) +
"ns");
+ }
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runKeyQuery(final Query query,
+ final PositionBound positionBound, final boolean collectExecutionInfo)
{
+ final QueryResult<R> result;
+ final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+ final KeyQuery<Bytes, byte[]> rawKeyQuery =
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+ final QueryResult<byte[]> rawResult =
+ wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+ if (rawResult.isSuccess()) {
+ final boolean timestamped =
WrappedStateStore.isTimestamped(wrapped());
+ final Serde<V> vSerde = serdes.valueSerde();
+ final Deserializer<V> deserializer;
+ if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
Review comment:
> The MeteredStore's serde is always a ValueAndTimestamp serde
regardless of whether the inner store is Timestamped or not.
Is it? (1) We also have `MeteredTimestampStore` (of course is extends
`MeteredStore`) but it seems better to split the logic and move everything
timestamp related into `MeteredTimestampStore`. (2) For PAPI users, they can
add a plain `KeyValueStore` and we won't wrap it with the `TimestampedStore`
face and the serdes won't be `ValueAndTimestamp` either.
> What we do is, when you have a non-timestamped store, we wrap it with an
extra layer
(org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter)
that pads the returned values with a fake timestamp
(org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat
We only do this in the DSL, if the user gives as a non-timestamped store via
`Materialized` -- but for PAPI users, we never do this but use whatever store
is given to use as-is.
> so we did not implement the same padding logic for non-timestamped data
and instead just bubble up to the MeteredStore
Not sure if I can follow? It should not be a concern for IQ? Also, the
current conversion between plain/timestamped is really just a corner case (and
a case that we want to deprecate anyway -- we just did not find a way to do so
-- maybe we should add a runtime check at some point and WARN users if they
provide a non-timestamped store until we remove support for it and throw an
exception instead...). Seems not worth to add more tech debt for this behavior
that we only added to not break stuff.
> Which means that if we want to deserialize it, we need to know whether to
use the ValueAndTimestamp deserializer or just the Value's deserializer.
Yes, but we should split this logic between the plain `MeteredStore` and the
`MeteredTimestampStore`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]