UladzislauBlok commented on code in PR #21684:
URL: https://github.com/apache/kafka/pull/21684#discussion_r2904477355


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -119,11 +116,13 @@ public class MeteredKeyValueStore<K, V>
             )
         );
 
-    MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
-                         final String metricsScope,
-                         final Time time,
-                         final Serde<K> keySerde,
-                         final Serde<V> valueSerde) {
+    MeteredKeyValueStore(
+        final KeyValueStore<Bytes, byte[]> inner,
+        final String metricsScope,
+        final Time time,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
+    ) {

Review Comment:
   ### Offtopic
   
   Question about formatting fix: do we have consistent code style across code 
base? I mean literally cli formatted or ide setting
   I was able to find this: 
https://kafka.apache.org/community/developer/#streams-api , but I don't think 
that's enough to keep code style consistent



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -102,12 +102,12 @@ public ValueTimestampHeaders<V> putIfAbsent(final K key,
     public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
+        throw new UnsupportedOperationException("Querying is not supported for 
" + getClass().getSimpleName());
     }
 
     @Override
     public Position getPosition() {
-        throw new UnsupportedOperationException("Position is not supported by 
timestamped key-value stores with headers yet.");
+        throw new UnsupportedOperationException("Position is not supported for 
" + getClass().getSimpleName());
     }
 
     protected Bytes keyBytes(final K key, final Headers headers) {

Review Comment:
   Do we still need this method, if we already have it from parent store? 
   see 
https://github.com/apache/kafka/pull/21684/changes#diff-9af87381ff50464fc0979726bd22231c747d941b3bb7a9c152ddbf430c61cf23R444



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -145,7 +145,7 @@ private class MeteredVersionedKeyValueStoreInternal
         public long put(final K key, final V value, final long timestamp) {
             Objects.requireNonNull(key, "key cannot be null");
             try {
-                final long validTo = maybeMeasureLatency(() -> 
inner.put(keyBytes(key), plainValueSerdes.rawValue(value), timestamp), time, 
putSensor);
+                final long validTo = maybeMeasureLatency(() -> 
inner.put(serializeKey(key), plainValueSerdes.rawValue(value), timestamp), 
time, putSensor);

Review Comment:
   `plainValueSerdes.rawValue(value)`. do we need headers there?



##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -135,10 +139,11 @@ public void 
shouldSkipValueAndTimestampeInformationForErrorOnTimestampAndValueSe
 
     @Test
     public void shouldThrowIfIncompatibleSerdeForKey() throws 
ClassNotFoundException {
+        @SuppressWarnings("rawtypes")

Review Comment:
   out of curiosity: why `new RecordHeaders()` leads to 
`@SuppressWarnings("rawtypes")`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -420,18 +432,26 @@ public void close() {
         }
     }
 
-    protected V outerValue(final byte[] value) {
-        return value != null ? serdes.valueFrom(value, new RecordHeaders()) : 
null;
+    protected byte[] serializeValue(final V value) {
+        return value != null ? serdes.rawValue(value, 
internalContext.headers()) : null;

Review Comment:
   IMHO. I think `new RecordHeaders` looks more correct (from backward 
compatibility POV), but essentially that's okay to do `context.headers()` 
because:
   - If user doesn't use headers aware serializer, headers will be ignored 
(ether empty or not)
   - The idea of this ticket is to actually fix code base to propagate headers, 
and if we can propagate original headers instead of "mocked" one we go for it
   - It doesn't make any difference if users aren't using headers state stores 
/ serializers, but helps us to consistently apply simple paradigm: if there is 
a access to original headers -> go for it, if not fallback to `new 
RecordHeaders` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to