mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) { - + Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `<Bytes, byte[]>` while they are exposed to `Processors` as `<K,V>` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a changelogging layer (both are inserted by default). ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) { - + Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `<Bytes, byte[]>` while they are exposed to `Processors` as `<K,V>` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a changelogging layer (both are inserted by default). ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) { - + Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `<Bytes, byte[]>` while they are exposed to `Processors` as `<K,V>` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default). ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) { - + Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `<Bytes, byte[]>` while they are exposed to `Processors` as `<K,V>` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default). ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) { - + Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `<Bytes, byte[]>` while they are exposed to `Processors` as `<K,V>` types.) Thus, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org