UladzislauBlok commented on code in PR #21684:
URL: https://github.com/apache/kafka/pull/21684#discussion_r2904477355
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -119,11 +116,13 @@ public class MeteredKeyValueStore<K, V>
)
);
- MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
- final String metricsScope,
- final Time time,
- final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ MeteredKeyValueStore(
+ final KeyValueStore<Bytes, byte[]> inner,
+ final String metricsScope,
+ final Time time,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
Review Comment:
### Offtopic
Question about formatting fix: do we have consistent code style across code
base? I mean literally cli formatted or ide setting
I was able to find this:
https://kafka.apache.org/community/developer/#streams-api , but I don't think
that's enough to keep code style consistent
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -102,12 +102,12 @@ public ValueTimestampHeaders<V> putIfAbsent(final K key,
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
+ throw new UnsupportedOperationException("Querying is not supported for
" + getClass().getSimpleName());
}
@Override
public Position getPosition() {
- throw new UnsupportedOperationException("Position is not supported by
timestamped key-value stores with headers yet.");
+ throw new UnsupportedOperationException("Position is not supported for
" + getClass().getSimpleName());
}
protected Bytes keyBytes(final K key, final Headers headers) {
Review Comment:
Do we still need this method, if we already have it from parent store?
see
https://github.com/apache/kafka/pull/21684/changes#diff-9af87381ff50464fc0979726bd22231c747d941b3bb7a9c152ddbf430c61cf23R444
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -145,7 +145,7 @@ private class MeteredVersionedKeyValueStoreInternal
public long put(final K key, final V value, final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
try {
- final long validTo = maybeMeasureLatency(() ->
inner.put(keyBytes(key), plainValueSerdes.rawValue(value), timestamp), time,
putSensor);
+ final long validTo = maybeMeasureLatency(() ->
inner.put(serializeKey(key), plainValueSerdes.rawValue(value), timestamp),
time, putSensor);
Review Comment:
`plainValueSerdes.rawValue(value)`. do we need headers there?
##########
streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java:
##########
@@ -135,10 +139,11 @@ public void
shouldSkipValueAndTimestampeInformationForErrorOnTimestampAndValueSe
@Test
public void shouldThrowIfIncompatibleSerdeForKey() throws
ClassNotFoundException {
+ @SuppressWarnings("rawtypes")
Review Comment:
out of curiosity: why `new RecordHeaders()` leads to
`@SuppressWarnings("rawtypes")`?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -420,18 +432,26 @@ public void close() {
}
}
- protected V outerValue(final byte[] value) {
- return value != null ? serdes.valueFrom(value, new RecordHeaders()) :
null;
+ protected byte[] serializeValue(final V value) {
+ return value != null ? serdes.rawValue(value,
internalContext.headers()) : null;
Review Comment:
IMHO. I think `new RecordHeaders` looks more correct (from backward
compatibility POV), but essentially that's okay to do `context.headers()`
because:
- If user doesn't use headers aware serializer, headers will be ignored
(ether empty or not)
- The idea of this ticket is to actually fix code base to propagate headers,
and if we can propagate original headers instead of "mocked" one we go for it
- It doesn't make any difference if users aren't using headers state stores
/ serializers, but helps us to consistently apply simple paradigm: if there is
a access to original headers -> go for it, if not fallback to `new
RecordHeaders`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]