[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-26 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r620479940



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##
@@ -402,6 +402,11 @@ public void shouldReturnKeysWithGivenPrefix() {
 assertThat(valuesWithPrefix.get(2), is("b"));
 }
 
+@Test
+public void shouldThrowNullPointerIfPrefixKeySerializerIsNull() {

Review comment:
   This test fails now, as the check was removed. No need to add this test 
any longer in this class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` -- those `MeteredXxxsStores` do the transaction from objects 
to bytes (ie they use the serdes) and also track state store metrics. (Note 
that stores provided to the runtime always have type `` while 
they are exposed to `Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a changelogging layer (both are inserted by default).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a changelogging layer (both are inserted by default).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a "change logging layer" (both are inserted by default).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
inter

[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   Thus, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a "change logging layer" (both are inserted by default).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a "change logging layer" (both are inserted by default).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a "change logging layer" (both are inserted by default).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49)
 -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they 
use the serdes) and also track state store metrics. (Note that stores provided 
to the runtime always have type `` while they are exposed to 
`Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a changelogging layer (both are inserted by default).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-25 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   KafkaStreams runtime _always_ "wraps" any store with a corresponding 
`MeteredXxxStore` -- those `MeteredXxxsStores` do the transaction from objects 
to bytes (ie they use the serdes) and also track state store metrics. (Note 
that stores provided to the runtime always have type `` while 
they are exposed to `Processors` as `` types.)
   
   This, when you call `context.stateStore(...)` you always get a 
`MeteredXxxStore` object -- of course, those details are hidden behind the 
interface type.
   
   This architecture allows us to unify code and separate concerns. In fact, it 
also allows us to add/remove more "layers": we can insert a "caching layer" 
(cf. 
https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html)
 and a changelogging layer (both are inserted by default).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-22 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r618760487



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -219,11 +224,19 @@ public V putIfAbsent(final K key,
 
 @Override
 public void putAll(final List> entries) {
+final List> possiblyNullKeys = entries

Review comment:
   I think we could simplify this to a one liner?
   ```
   entries.forEach(entry -> Objects.requireNonNull(entry.key, "key cannot be 
null"));
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
##
@@ -472,11 +472,26 @@ public void shouldThrowNullPointerOnRemoveIfKeyIsNull() {
 assertThrows(NullPointerException.class, () -> store.remove(null));
 }
 
+@Test
+public void shouldThrowNullPointerOnPutIfWrappedKeyIsNull() {
+assertThrows(NullPointerException.class, () -> store.put(new 
Windowed<>(null, new SessionWindow(0, 0)), "a"));

Review comment:
   This test remind me, that the `SessionWindow` what is wrapped should not 
be `null` either. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -234,13 +247,15 @@ public V delete(final K key) {
 
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
-
+Objects.requireNonNull(prefix, "key cannot be null");

Review comment:
   As mentioned by @cadonna the wrapped stores, also check 
`prefixKeySerializer` for null -- thus might be good to move both check here.
   
   I think we can also remove both checks in `RocksDBStore` and 
`InMemoryKeyValueStore` -- they seems to be redundant now? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-20 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r617136596



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -73,6 +76,7 @@
 public boolean putIfDifferentValues(final K key,
 final ValueAndTimestamp newValue,
 final byte[] oldSerializedValue) {
+Objects.requireNonNull(key, "key cannot be null");

Review comment:
   Actually not public API -- don't need this check.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -59,6 +61,7 @@
 
 
 public RawAndDeserializedValue getWithBinary(final K key) {
+Objects.requireNonNull(key, "key cannot be null");

Review comment:
   Actually not public API -- don't need this check.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -326,6 +326,7 @@ public void putAll(final List> 
entries) {
 
 @Override
 public synchronized byte[] get(final Bytes key) {
+Objects.requireNonNull(key, "key cannot be null");

Review comment:
   This check can be remove, as we do the check in the "metered" store that 
will wrap this one.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -290,6 +295,7 @@ protected V outerValue(final byte[] value) {
 }
 
 protected Bytes keyBytes(final K key) {
+Objects.requireNonNull(key, "key cannot be null");

Review comment:
   This method is not `public` so no need to add this check




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-16 Thread GitBox


mjsax commented on a change in pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#discussion_r615007673



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -326,6 +326,7 @@ public void putAll(final List> 
entries) {
 
 @Override
 public synchronized byte[] get(final Bytes key) {
+Objects.requireNonNull(key, "key cannot be null");

Review comment:
   I think we should add this to `MeteredKeyValueStore` instead -- this 
way, we get the same guard for the in-memory key-value store (or other custom 
stores).
   
   We should also add this check to other methods that accept a key, like 
`put`, `putIfAbsent` etc...
   
   Furthermore, we should also add it for other store types, ie, 
`MeteredTimestampedKeyValueStore`, `MeteredWindowedStore`, 
`MeteredTimestampedWindowStore` and `MeteredSessionStore`.
   
   Last, but not least, we should add corresponding unit tests.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org