guozhangwang commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r494747319



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
##########
@@ -60,6 +60,34 @@ public void prepareTopology() throws InterruptedException {
         rightStream = builder.stream(INPUT_TOPIC_RIGHT);
     }
 
+    @Test
+    public void shouldNotAccessJoinStoresWhenGivingName() throws 
InterruptedException {

Review comment:
       A good coverage improvement! Thanks.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##########
@@ -104,19 +95,11 @@ public StreamThreadStateStoreProvider(final StreamThread 
streamThread,
         }
     }
 
-    private TaskId createKeyTaskId(final String storeName, final Integer 
partition) {
-        if (partition == null) {
-            return null;
-        }
-        final List<String> sourceTopics = 
internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
-        final Set<String> sourceTopicsSet = new HashSet<>(sourceTopics);
-        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = 
internalTopologyBuilder.topicGroups();
-        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> 
topicGroup : topicGroups.entrySet()) {
-            if 
(topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
-                return new TaskId(topicGroup.getKey(), partition);
-            }
-        }
-        throw new InvalidStateStoreException("Cannot get state store " + 
storeName + " because the requested partition " +
-            partition + " is not available on this instance");
+    private Optional<Task> findStreamTask(final Collection<Task> tasks, final 
String storeName, final int partition) {

Review comment:
       This is a great find, thanks!

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##########
@@ -56,25 +55,6 @@ public QueryableStoreProvider(final 
List<StreamThreadStateStoreProvider> storePr
         if (!globalStore.isEmpty()) {
             return queryableStoreType.create(globalStoreProvider, storeName);
         }
-        final List<T> allStores = new ArrayList<>();

Review comment:
       LGTM.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final 
StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : 
storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s 
does not exist.",

Review comment:
       Could you elaborate a bit more about this? If `allStores.isEmpty()` is 
empty, it is always possible that the specified store-partition or just 
store-"null" does not exist in this client. Why they are different failure 
cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to