mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1396762731
########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +139,216 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query, + final PositionBound positionBound, Review Comment: nit: fix indention ########## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java: ########## @@ -127,9 +136,25 @@ public <R> QueryResult<R> query( final QueryConfig config) { + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; - final QueryResult<R> result = store.query(query, positionBound, config); + QueryResult<R> result = store.query(query, positionBound, config); + + if (result.isSuccess()) { + if (query instanceof KeyQuery || query instanceof TimestampedKeyQuery) { + final byte[] plainValue = (byte[]) result.getResult(); + final byte[] valueWithTimestamp = convertToTimestampedFormat(plainValue); + result = (QueryResult<R>) InternalQueryResultUtil.copyAndSubstituteDeserializedResult(result, valueWithTimestamp); + } else if (query instanceof RangeQuery || query instanceof TimestampedRangeQuery) { + final KeyValueToTimestampedKeyValueAdapterIterator wrappedRocksDBRangeIterator = new KeyValueToTimestampedKeyValueAdapterIterator((RocksDbIterator) result.getResult()); + result = (QueryResult<R>) InternalQueryResultUtil.copyAndSubstituteDeserializedResult(result, wrappedRocksDBRangeIterator); + } else { + throw new StreamsException("unsupported query type"); Review Comment: Let change this to: ``` throw new IllegalArgumentException("Unsupported query type: " + query.getClass()); ``` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +139,216 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.key())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult()); + final QueryResult<ValueAndTimestamp<V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueAndTimestamp); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query; + RangeQuery<Bytes, byte[]> rawRangeQuery; + final boolean isKeyAscending = typedQuery.isKeyAscending(); + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.lowerBound().orElse(null)), + keyBytes(typedQuery.upperBound().orElse(null)) + ); + if (!isKeyAscending) { + rawRangeQuery = rawRangeQuery.withDescendingKeys(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = (KeyValueIterator<K, ValueAndTimestamp<V>>) new MeteredTimestampedKeyValueStoreIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + false + ); + final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult()); + final V plainValue = valueAndTimestamp == null ? null : valueAndTimestamp.value(); + final QueryResult<V> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, plainValue); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query; + RangeQuery<Bytes, byte[]> rawRangeQuery; + final boolean isKeyAscending = typedQuery.isKeyAscending(); + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().orElse(null)), + keyBytes(typedQuery.getUpperBound().orElse(null)) + ); + if (!isKeyAscending) { + rawRangeQuery = rawRangeQuery.withDescendingKeys(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, V> resultIterator = new MeteredTimestampedKeyValueStoreIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + true + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private class MeteredTimestampedKeyValueStoreIterator implements KeyValueIterator<K, V> { + + private final KeyValueIterator<Bytes, byte[]> iter; + private final Sensor sensor; + private final long startNs; + private final Function<byte[], ValueAndTimestamp<V>> valueDeserializer; + + private final boolean isKeyOrRangeQuery; + + private MeteredTimestampedKeyValueStoreIterator(final KeyValueIterator<Bytes, byte[]> iter, + final Sensor sensor, + final Function<byte[], ValueAndTimestamp<V>> valueDeserializer, + final boolean isKeyOrRangeQuery) { Review Comment: `isKeyOrRangeQuery` -> `returnPlainValue` For KeyQuery we don't create an iterator at all, and `returnPlainValue` is more descriptive ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +139,216 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.key())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult()); + final QueryResult<ValueAndTimestamp<V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueAndTimestamp); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query; + RangeQuery<Bytes, byte[]> rawRangeQuery; + final boolean isKeyAscending = typedQuery.isKeyAscending(); + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.lowerBound().orElse(null)), + keyBytes(typedQuery.upperBound().orElse(null)) + ); + if (!isKeyAscending) { + rawRangeQuery = rawRangeQuery.withDescendingKeys(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = (KeyValueIterator<K, ValueAndTimestamp<V>>) new MeteredTimestampedKeyValueStoreIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + false + ); + final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult()); + final V plainValue = valueAndTimestamp == null ? null : valueAndTimestamp.value(); + final QueryResult<V> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, plainValue); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query; + RangeQuery<Bytes, byte[]> rawRangeQuery; + final boolean isKeyAscending = typedQuery.isKeyAscending(); + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().orElse(null)), + keyBytes(typedQuery.getUpperBound().orElse(null)) + ); + if (!isKeyAscending) { + rawRangeQuery = rawRangeQuery.withDescendingKeys(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, V> resultIterator = new MeteredTimestampedKeyValueStoreIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + true + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private class MeteredTimestampedKeyValueStoreIterator implements KeyValueIterator<K, V> { + + private final KeyValueIterator<Bytes, byte[]> iter; + private final Sensor sensor; + private final long startNs; + private final Function<byte[], ValueAndTimestamp<V>> valueDeserializer; + + private final boolean isKeyOrRangeQuery; + + private MeteredTimestampedKeyValueStoreIterator(final KeyValueIterator<Bytes, byte[]> iter, + final Sensor sensor, + final Function<byte[], ValueAndTimestamp<V>> valueDeserializer, + final boolean isKeyOrRangeQuery) { + this.iter = iter; + this.sensor = sensor; + this.valueDeserializer = valueDeserializer; + this.startNs = time.nanoseconds(); + this.isKeyOrRangeQuery = isKeyOrRangeQuery; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyValue<K, V> next() { + final KeyValue<Bytes, byte[]> keyValue = iter.next(); + if (isKeyOrRangeQuery) { + final V value = valueDeserializer.apply(keyValue.value).value(); Review Comment: `value` -> `plainValue` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java: ########## @@ -247,26 +247,18 @@ public Position getPosition() { } @SuppressWarnings("unchecked") - protected <R> QueryResult<R> runRangeQuery(final Query<R> query, + private <R> QueryResult<R> runRangeQuery(final Query<R> query, final PositionBound positionBound, Review Comment: nit: adjust indention ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +139,216 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.key())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult()); + final QueryResult<ValueAndTimestamp<V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueAndTimestamp); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query; + RangeQuery<Bytes, byte[]> rawRangeQuery; + final boolean isKeyAscending = typedQuery.isKeyAscending(); + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.lowerBound().orElse(null)), + keyBytes(typedQuery.upperBound().orElse(null)) + ); + if (!isKeyAscending) { + rawRangeQuery = rawRangeQuery.withDescendingKeys(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = (KeyValueIterator<K, ValueAndTimestamp<V>>) new MeteredTimestampedKeyValueStoreIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + false + ); + final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); + final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult()); + final V plainValue = valueAndTimestamp == null ? null : valueAndTimestamp.value(); + final QueryResult<V> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, plainValue); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final QueryResult<R> result; + final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query; + RangeQuery<Bytes, byte[]> rawRangeQuery; + final boolean isKeyAscending = typedQuery.isKeyAscending(); + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().orElse(null)), + keyBytes(typedQuery.getUpperBound().orElse(null)) + ); + if (!isKeyAscending) { + rawRangeQuery = rawRangeQuery.withDescendingKeys(); + } + final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult = + wrapped().query(rawRangeQuery, positionBound, config); + if (rawResult.isSuccess()) { + final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult(); + final KeyValueIterator<K, V> resultIterator = new MeteredTimestampedKeyValueStoreIterator( + iterator, + getSensor, + getDeserializeValue(serdes, wrapped()), + true + ); + final QueryResult<KeyValueIterator<K, V>> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( + rawResult, + resultIterator + ); + result = (QueryResult<R>) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult<R>) rawResult; + } + return result; + } + + @SuppressWarnings("unchecked") + private class MeteredTimestampedKeyValueStoreIterator implements KeyValueIterator<K, V> { + + private final KeyValueIterator<Bytes, byte[]> iter; + private final Sensor sensor; + private final long startNs; + private final Function<byte[], ValueAndTimestamp<V>> valueDeserializer; + + private final boolean isKeyOrRangeQuery; + + private MeteredTimestampedKeyValueStoreIterator(final KeyValueIterator<Bytes, byte[]> iter, + final Sensor sensor, + final Function<byte[], ValueAndTimestamp<V>> valueDeserializer, Review Comment: `valueDeserializer` -> `valueAndTimestampDeserializer` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java: ########## @@ -247,26 +247,18 @@ public Position getPosition() { } @SuppressWarnings("unchecked") - protected <R> QueryResult<R> runRangeQuery(final Query<R> query, + private <R> QueryResult<R> runRangeQuery(final Query<R> query, final PositionBound positionBound, final QueryConfig config) { final QueryResult<R> result; final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query; RangeQuery<Bytes, byte[]> rawRangeQuery; final boolean isKeyAscending = typedQuery.isKeyAscending(); - if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { - rawRangeQuery = RangeQuery.withRange( - keyBytes(typedQuery.getLowerBound().get()), - keyBytes(typedQuery.getUpperBound().get()) - ); - } else if (typedQuery.getLowerBound().isPresent()) { - rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); - } else if (typedQuery.getUpperBound().isPresent()) { - rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get())); - } else { - rawRangeQuery = RangeQuery.withNoBounds(); - } + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().orElse(null)), + keyBytes(typedQuery.getUpperBound().orElse(null)) + ); Review Comment: Thanks for this cleanup. Very nice! ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ########## @@ -195,17 +195,15 @@ public <R> QueryResult<R> query(final Query<R> query, final PositionBound positi return result; } - @Override - protected <R> QueryResult<R> runRangeQuery(final Query<R> query, - final PositionBound positionBound, - final QueryConfig config) { + private <R> QueryResult<R> runRangeQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { // throw exception for now to reserve the ability to implement this in the future // without clashing with users' custom implementations in the meantime throw new UnsupportedOperationException("Versioned stores do not support RangeQuery queries at this time."); } - @Override - protected <R> QueryResult<R> runKeyQuery(final Query<R> query, + private <R> QueryResult<R> runKeyQuery(final Query<R> query, final PositionBound positionBound, Review Comment: nit: adjust indention ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java: ########## @@ -293,7 +285,7 @@ protected <R> QueryResult<R> runRangeQuery(final Query<R> query, } @SuppressWarnings("unchecked") - protected <R> QueryResult<R> runKeyQuery(final Query<R> query, + private <R> QueryResult<R> runKeyQuery(final Query<R> query, final PositionBound positionBound, Review Comment: adjust indention ########## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ########## @@ -102,4 +139,216 @@ static class RawAndDeserializedValue<ValueType> { this.value = value; } } + + @SuppressWarnings("unchecked") + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + + final long start = time.nanoseconds(); + final QueryResult<R> result; + + final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); + if (handler == null) { + result = wrapped().query(query, positionBound, config); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); + } + } else { + result = (QueryResult<R>) handler.apply( + query, + positionBound, + config, + this + ); + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); + } + } + return result; + } + + + + @SuppressWarnings("unchecked") + private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final QueryResult<R> result; + final TimestampedKeyQuery<K, V> typedKeyQuery = (TimestampedKeyQuery<K, V>) query; + final KeyQuery<Bytes, byte[]> rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.key())); + final QueryResult<byte[]> rawResult = + wrapped().query(rawKeyQuery, positionBound, config); + if (rawResult.isSuccess()) { + final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); Review Comment: With the fix to add the `TimestampedByteStore` marker interface to the adaptor class, I think we can simplify `getDeserializeValue(...)` -- as a matter of fact, it should become so simple that we might not need this helper any longer at all, and we can pull in the code into the callers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org