vvcephei commented on a change in pull request #9020: URL: https://github.com/apache/kafka/pull/9020#discussion_r455948704
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java ########## @@ -51,18 +51,22 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread) { final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); + final List<T> stores = new ArrayList<>(); if (storeQueryParams.partition() != null) { final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); - if (streamTask == null) { - return Collections.emptyList(); + if (streamTask != null) { + final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); + if (store != null) { + stores.add(store); + } } - final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); - return store != null ? Collections.singletonList(store) : Collections.emptyList(); + } else { + tasks.values().stream(). + map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). + filter(Objects::nonNull). + forEach(stores::add); } - return tasks.values().stream(). - map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). - filter(Objects::nonNull). - collect(Collectors.toList()); + return Collections.unmodifiableList(stores); Review comment: Ah, sorry, I can see that my prior comment was ambiguous. This is what I meant: ```java if (storeQueryParams.partition() == null) { return tasks.values().stream(). map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). filter(Objects::nonNull). collect(Collectors.toList()); } else { final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); if (streamTask == null) { return Collections.emptyList(); } else { final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); return store == null ? Collections.emptyList() : Collections.singletonList(store); } } ``` The reason this is better for maintenence is that you only have to trace a path through the nested conditionals into a single inner block to understand what gets returned. I.e., code comprehension complexity is only the depth of the conditional tree. In contrast, if we do early returns, you have to fully read all the conditional blocks that lead up to the one you're interested (depth-first traversal), so code comprehension is linear instead of logarithmic. If we mutate the collection, you actually have to read _all_ the conditionals to understand what is going to happen, so code comprehension is also linear instead of logarithmic. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org