mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+        Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
       KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` -- those `MeteredXxxsStores` do the transaction from objects 
to bytes (ie they use the serdes) and also track state store metrics. (Note 
that stores provided to the runtime always have type `<Bytes, byte[]>` while 
they are exposed to `Processors` as `<K,V>` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a changelogging layer (both are inserted by default).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+        Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
       KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `<Bytes, byte[]>` while they are exposed to 
`Processors` as `<K,V>` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a changelogging layer (both are inserted by default).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+        Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
       KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `<Bytes, byte[]>` while they are exposed to 
`Processors` as `<K,V>` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a "change logging layer" (both are inserted by default).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+        Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
       KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `<Bytes, byte[]>` while they are exposed to 
`Processors` as `<K,V>` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a "change logging layer" (both are inserted by default).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+        Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
       KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `<Bytes, byte[]>` while they are exposed to 
`Processors` as `<K,V>` types.)
   
   Thus, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a "change logging layer" (both are inserted by default).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to