[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r620479940 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java ## @@ -402,6 +402,11 @@ public void shouldReturnKeysWithGivenPrefix() { assertThat(valuesWithPrefix.get(2), is("b")); } +@Test +public void shouldThrowNullPointerIfPrefixKeySerializerIsNull() { Review comment: This test fails now, as the check was removed. No need to add this test any longer in this class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a changelogging layer (both are inserted by default). ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a changelogging layer (both are inserted by default). ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default). ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the inter
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) Thus, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default). ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a changelogging layer (both are inserted by default). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r619899204 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: KafkaStreams runtime _always_ "wraps" any store with a corresponding `MeteredXxxStore` -- those `MeteredXxxsStores` do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type `` while they are exposed to `Processors` as `` types.) This, when you call `context.stateStore(...)` you always get a `MeteredXxxStore` object -- of course, those details are hidden behind the interface type. This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a changelogging layer (both are inserted by default). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r618760487 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -219,11 +224,19 @@ public V putIfAbsent(final K key, @Override public void putAll(final List> entries) { +final List> possiblyNullKeys = entries Review comment: I think we could simplify this to a one liner? ``` entries.forEach(entry -> Objects.requireNonNull(entry.key, "key cannot be null")); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java ## @@ -472,11 +472,26 @@ public void shouldThrowNullPointerOnRemoveIfKeyIsNull() { assertThrows(NullPointerException.class, () -> store.remove(null)); } +@Test +public void shouldThrowNullPointerOnPutIfWrappedKeyIsNull() { +assertThrows(NullPointerException.class, () -> store.put(new Windowed<>(null, new SessionWindow(0, 0)), "a")); Review comment: This test remind me, that the `SessionWindow` what is wrapped should not be `null` either. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -234,13 +247,15 @@ public V delete(final K key) { @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { - +Objects.requireNonNull(prefix, "key cannot be null"); Review comment: As mentioned by @cadonna the wrapped stores, also check `prefixKeySerializer` for null -- thus might be good to move both check here. I think we can also remove both checks in `RocksDBStore` and `InMemoryKeyValueStore` -- they seems to be redundant now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r617136596 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java ## @@ -73,6 +76,7 @@ public boolean putIfDifferentValues(final K key, final ValueAndTimestamp newValue, final byte[] oldSerializedValue) { +Objects.requireNonNull(key, "key cannot be null"); Review comment: Actually not public API -- don't need this check. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java ## @@ -59,6 +61,7 @@ public RawAndDeserializedValue getWithBinary(final K key) { +Objects.requireNonNull(key, "key cannot be null"); Review comment: Actually not public API -- don't need this check. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -326,6 +326,7 @@ public void putAll(final List> entries) { @Override public synchronized byte[] get(final Bytes key) { +Objects.requireNonNull(key, "key cannot be null"); Review comment: This check can be remove, as we do the check in the "metered" store that will wrap this one. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -290,6 +295,7 @@ protected V outerValue(final byte[] value) { } protected Bytes keyBytes(final K key) { +Objects.requireNonNull(key, "key cannot be null"); Review comment: This method is not `public` so no need to add this check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on a change in pull request #10548: URL: https://github.com/apache/kafka/pull/10548#discussion_r615007673 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -326,6 +326,7 @@ public void putAll(final List> entries) { @Override public synchronized byte[] get(final Bytes key) { +Objects.requireNonNull(key, "key cannot be null"); Review comment: I think we should add this to `MeteredKeyValueStore` instead -- this way, we get the same guard for the in-memory key-value store (or other custom stores). We should also add this check to other methods that accept a key, like `put`, `putIfAbsent` etc... Furthermore, we should also add it for other store types, ie, `MeteredTimestampedKeyValueStore`, `MeteredWindowedStore`, `MeteredTimestampedWindowStore` and `MeteredSessionStore`. Last, but not least, we should add corresponding unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org