mjsax commented on code in PR #21643:
URL: https://github.com/apache/kafka/pull/21643#discussion_r2893839057
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java:
##########
@@ -173,8 +173,7 @@ public long approximateNumEntries() {
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
-
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported by timestamped key-value stores with headers yet.");
+ return wrapped().query(query, positionBound, config);
Review Comment:
As we extend `Wrapped` store, I think it best to remove the whole override
and re-use the impl from `Wrapped` store.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -86,7 +120,7 @@ public void put(final K key,
@Override
public ValueTimestampHeaders<V> putIfAbsent(final K key,
- final ValueTimestampHeaders<V> value) {
+ final ValueTimestampHeaders<V>
value) {
Review Comment:
TY!
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -152,11 +148,4 @@ private void verifyAndCloseEmptyDefaultColumnFamily(final
ColumnFamilyHandle col
}
}
- @Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
- }
Review Comment:
Yes -- but thinking about it, throwing an exception is not right anyway? We
should rather return a `QueryResult.forUnknownQueryType(query, store);` and
some boiler plate code for "tracing" and "position" handling?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -152,7 +152,7 @@ public boolean isOpen() {
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
+ return store.query(query, positionBound, config);
Review Comment:
If this is an adaptor, do we need to make some translation between types
here?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -98,16 +132,259 @@ public ValueTimestampHeaders<V> putIfAbsent(final K key,
return currentValue;
}
+ /**
+ * Executes a query against this store.
+ *
+ * <p>Note: Query results do NOT include headers, even though headers are
+ * preserved in the underlying store. This behavior provides compatibility
+ * with existing IQv2 APIs that operate on timestamped stores.
+ *
+ * @param query the query to execute
+ * @param positionBound the position bound
+ * @param config the query configuration
+ * @return the query result
+ */
+
+ @SuppressWarnings("unchecked")
@Override
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
+
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ final StoreQueryUtils.QueryHandler handler =
queryHandlers.get(query.getClass());
+ if (handler == null) {
+ result = wrapped().query(query, positionBound, config);
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (time.nanoseconds()
- start) + "ns");
+ }
+ } else {
+ result = (QueryResult<R>) handler.apply(
+ query,
+ positionBound,
+ config,
+ this
+ );
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo("Handled in " + getClass() + " with
serdes "
+ + serdes + " in " + (time.nanoseconds() - start) +
"ns");
+ }
+ }
+ return result;
}
- @Override
- public Position getPosition() {
- throw new UnsupportedOperationException("Position is not supported by
timestamped key-value stores with headers yet.");
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final QueryResult<R> result;
+ final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+ final KeyQuery<Bytes, byte[]> rawKeyQuery =
+ KeyQuery.withKey(keyBytes(typedKeyQuery.getKey(), new
RecordHeaders()));
+ final QueryResult<byte[]> rawResult =
+ wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final Function<byte[], ValueTimestampHeaders<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializer.apply(rawResult.getResult());
+ final V plainValue = valueTimestampHeaders == null ? null :
valueTimestampHeaders.value();
+ final QueryResult<V> typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
plainValue);
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no
result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+ final PositionBound
positionBound,
+ final QueryConfig
config) {
+ final QueryResult<R> result;
+ final TimestampedKeyQuery<K, V> typedKeyQuery =
(TimestampedKeyQuery<K, V>) query;
+ final KeyQuery<Bytes, byte[]> rawKeyQuery =
+ KeyQuery.withKey(keyBytes(typedKeyQuery.key(), new
RecordHeaders()));
+ final QueryResult<byte[]> rawResult =
+ wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final Function<byte[], ValueTimestampHeaders<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializer.apply(rawResult.getResult());
+ // Convert ValueTimestampHeaders to ValueAndTimestamp for the
result
+ final ValueAndTimestamp<V> valueAndTimestamp =
valueTimestampHeaders == null
+ ? null
+ : ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp());
+ final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
valueAndTimestamp);
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no
result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+
+ final QueryResult<R> result;
+ final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+ RangeQuery<Bytes, byte[]> rawRangeQuery;
+ final ResultOrder order = typedQuery.resultOrder();
+ rawRangeQuery = RangeQuery.withRange(
+ keyBytes(typedQuery.getLowerBound().orElse(null), new
RecordHeaders()),
+ keyBytes(typedQuery.getUpperBound().orElse(null), new
RecordHeaders())
+ );
+ if (order.equals(ResultOrder.DESCENDING)) {
+ rawRangeQuery = rawRangeQuery.withDescendingKeys();
+ }
+ if (order.equals(ResultOrder.ASCENDING)) {
+ rawRangeQuery = rawRangeQuery.withAscendingKeys();
+ }
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+ wrapped().query(rawRangeQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
+ final KeyValueIterator<K, V> resultIterator = new
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+ iterator,
+ getSensor,
+ StoreQueryUtils.deserializeValue(serdes, wrapped()),
+ true
+ );
+ final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+ InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+ rawResult,
+ resultIterator
+ );
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no
result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+ final PositionBound
positionBound,
+ final QueryConfig
config) {
+
+ final QueryResult<R> result;
+ final TimestampedRangeQuery<K, V> typedQuery =
(TimestampedRangeQuery<K, V>) query;
+ RangeQuery<Bytes, byte[]> rawRangeQuery;
+ final ResultOrder order = typedQuery.resultOrder();
+ rawRangeQuery = RangeQuery.withRange(
+ keyBytes(typedQuery.lowerBound().orElse(null), new
RecordHeaders()),
+ keyBytes(typedQuery.upperBound().orElse(null), new
RecordHeaders())
+ );
+ if (order.equals(ResultOrder.DESCENDING)) {
+ rawRangeQuery = rawRangeQuery.withDescendingKeys();
+ }
+ if (order.equals(ResultOrder.ASCENDING)) {
+ rawRangeQuery = rawRangeQuery.withAscendingKeys();
+ }
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+ wrapped().query(rawRangeQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
+ final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
+ (KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+ iterator,
+ getSensor,
+ StoreQueryUtils.deserializeValue(serdes,
wrapped()),
+ false
+ );
+ final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>>
typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+ rawResult,
+ resultIterator
+ );
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no
result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private class MeteredTimestampedKeyValueStoreWithHeadersIterator
implements KeyValueIterator<K, V>, MeteredIterator {
+
+ private final KeyValueIterator<Bytes, byte[]> iter;
+ private final Sensor sensor;
+ private final long startNs;
+ private final long startTimestampMs;
+ private final Function<byte[], ValueTimestampHeaders<V>>
valueTimestampHeadersDeserializer;
+
+ private final boolean returnPlainValue;
+
+ private MeteredTimestampedKeyValueStoreWithHeadersIterator(final
KeyValueIterator<Bytes, byte[]> iter,
+ final
Sensor sensor,
+ final
Function<byte[], ValueTimestampHeaders<V>> valueTimestampHeadersDeserializer,
+ final
boolean returnPlainValue) {
+ this.iter = iter;
+ this.sensor = sensor;
+ this.valueTimestampHeadersDeserializer =
valueTimestampHeadersDeserializer;
+ this.startNs = time.nanoseconds();
+ this.startTimestampMs = time.milliseconds();
+ this.returnPlainValue = returnPlainValue;
+ openIterators.add(this);
+ }
+
+ @Override
+ public long startTimestamp() {
+ return startTimestampMs;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, V> next() {
+ final KeyValue<Bytes, byte[]> keyValue = iter.next();
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
valueTimestampHeadersDeserializer.apply(keyValue.value);
+ final Headers headers = valueTimestampHeaders != null ?
valueTimestampHeaders.headers() : new RecordHeaders();
+ if (returnPlainValue) {
+ final V plainValue = valueTimestampHeaders == null ? null :
valueTimestampHeaders.value();
+ return KeyValue.pair(
+ serdes.keyFrom(keyValue.key.get(), headers),
+ plainValue
+ );
+ } else {
+ // Return as ValueAndTimestamp
+ final ValueAndTimestamp<V> valueAndTimestamp =
valueTimestampHeaders == null
+ ? null
+ : ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp());
+ return KeyValue.pair(
+ serdes.keyFrom(keyValue.key.get(), headers),
+ (V) valueAndTimestamp
Review Comment:
This is messy... but it's the same in `MeteredTimestampedKeyValueStore`.
Let's keep it this way. I'll do a follow up PR to fix this after this PR is
merged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]