mjsax commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r432144610



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##########
@@ -58,9 +58,21 @@ public QueryableStoreProvider(final 
List<StreamThreadStateStoreProvider> storePr
         }
         final List<T> allStores = new ArrayList<>();
         for (final StreamThreadStateStoreProvider storeProvider : 
storeProviders) {
-            allStores.addAll(storeProvider.stores(storeQueryParameters));
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (stores != null && !stores.isEmpty()) {

Review comment:
       Thinking about this one more, `stores` can never be `null`? Can we 
remove this check?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -48,7 +48,9 @@ public void setStoreQueryParameters(final 
StoreQueryParameters storeQueryParamet
         final List<T> allStores = new ArrayList<>();
         for (final StreamThreadStateStoreProvider provider : storeProviders) {
             final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+            if (stores != null && !stores.isEmpty()) {
+                allStores.addAll(stores);
+            }

Review comment:
       As above: `stores` should never be null, and thus we don't need this 
change? Also the check for `isEmpty` does give us much, we can still call 
`addAll` even it `stores` is empty?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
##########
@@ -88,5 +91,23 @@ public void shouldFindGlobalStores() {
         
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global",
 QueryableStoreTypes.keyValueStore())));
     }
 
+    @Test
+    public void shouldReturnKVStoreWithPartitionWhenItExists() {
+        
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore,
 QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 
1)));

Review comment:
       It might be better to test if the right store is returned instead of 
just checking for not-null? For this, in `before()` we need to get a reference 
on the store we pass into `addStore()`?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
##########
@@ -88,5 +91,23 @@ public void shouldFindGlobalStores() {
         
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global",
 QueryableStoreTypes.keyValueStore())));
     }
 
+    @Test
+    public void shouldReturnKVStoreWithPartitionWhenItExists() {
+        
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore,
 QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 
1)));
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowExceptionWhenKVStoreWithPartitionDoesntExists() {
+        
storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, 
QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions + 
1));

Review comment:
       Can we split this as follows:
   ```
   final StoreQueryParameters parameters = 
(StoreQueryParameters.fromNameAndType(keyValueStore, 
QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions + 1);
   
   final InvalidStateStoreException exception = asserThrows(
     InvalidStateStoreException.class,
     () -> storeProvider.getStore(parameters)
   );
   assertThat(exception.message(), equalTo("..."));
   ```
   
   And remove the `(excpected = ...)` annotation.
   
   (1) We should always limit the code that might throw the exception (eg, if 
`withPartition` would throw an `InvalidStateStoreException` the test should 
fail, but would pass in it's current setup) (2) We should always verify the 
exception cause -- `getStore()` could throw an `InvalidStateStoreException` or 
multiple reasons and we should make sure it's throwing for the reason under 
test.
   
   Same below for the windowed case




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to