vvcephei commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455948704
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##########
@@ -51,18 +51,22 @@ public StreamThreadStateStoreProvider(final StreamThread
streamThread) {
final StreamThread.State state = streamThread.state();
if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state ==
StreamThread.State.RUNNING) {
final Map<TaskId, ? extends Task> tasks =
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() :
streamThread.activeTaskMap();
+ final List<T> stores = new ArrayList<>();
if (storeQueryParams.partition() != null) {
final Task streamTask = findStreamTask(tasks, storeName,
storeQueryParams.partition());
- if (streamTask == null) {
- return Collections.emptyList();
+ if (streamTask != null) {
+ final T store =
validateAndListStores(streamTask.getStore(storeName), queryableStoreType,
storeName, streamTask.id());
+ if (store != null) {
+ stores.add(store);
+ }
}
- final T store =
validateAndListStores(streamTask.getStore(storeName), queryableStoreType,
storeName, streamTask.id());
- return store != null ? Collections.singletonList(store) :
Collections.emptyList();
+ } else {
+ tasks.values().stream().
+ map(streamTask ->
validateAndListStores(streamTask.getStore(storeName), queryableStoreType,
storeName, streamTask.id())).
+ filter(Objects::nonNull).
+ forEach(stores::add);
}
- return tasks.values().stream().
- map(streamTask ->
validateAndListStores(streamTask.getStore(storeName), queryableStoreType,
storeName, streamTask.id())).
- filter(Objects::nonNull).
- collect(Collectors.toList());
+ return Collections.unmodifiableList(stores);
Review comment:
Ah, sorry, I can see that my prior comment was ambiguous. This is what I
meant:
```java
if (storeQueryParams.partition() == null) {
return tasks.values().stream().
map(streamTask ->
validateAndListStores(streamTask.getStore(storeName), queryableStoreType,
storeName, streamTask.id())).
filter(Objects::nonNull).
collect(Collectors.toList());
} else {
final Task streamTask = findStreamTask(tasks, storeName,
storeQueryParams.partition());
if (streamTask == null) {
return Collections.emptyList();
} else {
final T store =
validateAndListStores(streamTask.getStore(storeName), queryableStoreType,
storeName, streamTask.id());
return store == null ? Collections.emptyList() :
Collections.singletonList(store);
}
}
```
The reason this is better for maintenence is that you only have to trace a
path through the nested conditionals into a single inner block to understand
what gets returned. I.e., code comprehension complexity is only the depth of
the conditional tree.
In contrast, if we do early returns, you have to fully read all the
conditional blocks that lead up to the one you're interested (depth-first
traversal), so code comprehension is linear instead of logarithmic. If we
mutate the collection, you actually have to read _all_ the conditionals to
understand what is going to happen, so code comprehension is also linear
instead of logarithmic.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]