mjsax commented on code in PR #21650:
URL: https://github.com/apache/kafka/pull/21650#discussion_r2898387352


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java:
##########
@@ -211,7 +214,26 @@ public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
 
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped window stores with headers yet.");
+        final QueryResult<R> result = store.query(query, positionBound, 
config);

Review Comment:
   In https://github.com/apache/kafka/pull/21643 the code is setup differently. 
We first check the query type, and we call `store.query(...)` only afterwards.
   
   Wondering what the right approach is -- but it seems best to align both PRs 
to follow the same pattern?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java:
##########
@@ -83,4 +97,200 @@ public void put(final K key, final ValueTimestampHeaders<V> 
value, final long wi
     protected Bytes keyBytes(final K key, final Headers headers) {
         return Bytes.wrap(serdes.rawKey(key, headers));
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        if (query instanceof WindowKeyQuery) {
+            result = runWindowKeyQuery((WindowKeyQuery<K, 
ValueTimestampHeaders<V>>) query, positionBound, config);
+        } else if (query instanceof WindowRangeQuery) {
+            result = runWindowRangeQuery((WindowRangeQuery<K, 
ValueTimestampHeaders<V>>) query, positionBound, config);
+        } else {
+            result = wrapped().query(query, positionBound, config);
+        }
+
+        if (config.isCollectExecutionInfo()) {
+            final String conversionType = isUnderlyingStoreTimestamped()

Review Comment:
   Oh dear... Some more IQv2 mess we did not clean up yet... We fixed this for 
kv-stores already, adding `KeyQuery` and `TimestampedKeyQuery` so the use can 
tell us what result they want to get, independent of the underlying store 
type... For window case we don't have this yet :/ 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java:
##########
@@ -211,7 +214,26 @@ public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
 
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped window stores with headers yet.");
+        final QueryResult<R> result = store.query(query, positionBound, 
config);
+
+        if (!result.isSuccess()) {
+            return result;
+        }
+
+        // Wrap iterators to convert from timestamped format to header format
+        if (query instanceof WindowKeyQuery) {
+            final QueryResult<WindowStoreIterator<byte[]>> rawResult = 
(QueryResult<WindowStoreIterator<byte[]>>) result;
+            final WindowStoreIterator<byte[]> wrappedIterator = new 
TimestampedWindowToHeadersWindowStoreIteratorAdapter(rawResult.getResult());
+            return (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
wrappedIterator);
+        } else if (query instanceof WindowRangeQuery) {
+            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult =
+                (QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>>) 
result;
+            final KeyValueIterator<Windowed<Bytes>, byte[]> wrappedIterator =
+                new 
TimestampedToHeadersIteratorAdapter<>(rawResult.getResult());
+            return (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
wrappedIterator);
+        }
+
+        return result;

Review Comment:
   Not sure if this is correct? -- For regular layers in the hierarchy, if a 
query-type is not supported, it's fine to forward to lower layers, but for an 
adaptor, if we cannot translate the bytes for unknown query type, even if the 
lower layer supports the query, we would crash in the upper layer trying to 
deserialize as the byte format does not match what the metered layer expects, 
w/o the adaptor fixing it up...
   
   So it seems better to return FailedQueryResult for this case?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java:
##########
@@ -211,7 +214,26 @@ public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
 
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped window stores with headers yet.");

Review Comment:
   We should also add the "tracing" code, ie, call result.addExecutionInfo(...).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to