This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5eb3c1  HOTFIX: fix failing StreamsMetadataStateTest tests (#11590)
d5eb3c1 is described below

commit d5eb3c10ecd394015336868f948348c62c0e4e77
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Dec 9 16:19:56 2021 -0800

    HOTFIX: fix failing StreamsMetadataStateTest tests (#11590)
    
    Followup to #11562 to fix broken tests in StreamsMetadataStateTest
    
    Reviewers: Walker Carlson <[email protected]>
---
 .../processor/internals/StreamsMetadataState.java  | 42 +++++++++++++---------
 1 file changed, 26 insertions(+), 16 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 67cd436..7911c45 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -329,15 +329,6 @@ public class StreamsMetadataState {
         Stream.concat(activePartitionHostMap.keySet().stream(), 
standbyPartitionHostMap.keySet().stream())
             .distinct()
             .forEach(hostInfo -> {
-                final Map<String, Collection<String>> namedTopologyToStoreName 
= new HashMap<>();
-                final Set<String> topologyNames = 
topologyMetadata.namedTopologiesView();
-                topologyNames.forEach(topologyName -> {
-                        final Collection<String> storesOnHostForTopologyName = 
getStoresOnHost(storeToSourceTopics, activePartitionHostMap.get(hostInfo), 
topologyName);
-                        
storesOnHostForTopologyName.addAll(getStoresOnHost(storeToSourceTopics, 
standbyPartitionHostMap.get(hostInfo), topologyName));
-                        namedTopologyToStoreName.put(topologyName, 
storesOnHostForTopologyName);
-                    }
-                );
-
                 final Set<TopicPartition> activePartitionsOnHost = new 
HashSet<>();
                 final Set<String> activeStoresOnHost = new HashSet<>();
                 if (activePartitionHostMap.containsKey(hostInfo)) {
@@ -353,13 +344,32 @@ public class StreamsMetadataState {
                     
standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, 
standbyPartitionsOnHost));
                 }
 
-                final StreamsMetadata metadata = new 
NamedTopologyStreamsMetadataImpl(
-                    hostInfo,
-                    activeStoresOnHost,
-                    activePartitionsOnHost,
-                    standbyStoresOnHost,
-                    standbyPartitionsOnHost,
-                    namedTopologyToStoreName);
+                final StreamsMetadata metadata;
+                if (topologyMetadata.hasNamedTopologies()) {
+                    final Map<String, Collection<String>> 
namedTopologyToStoreName = new HashMap<>();
+                    final Set<String> topologyNames = 
topologyMetadata.namedTopologiesView();
+                    topologyNames.forEach(topologyName -> {
+                        final Collection<String> storesOnHostForTopologyName = 
getStoresOnHost(storeToSourceTopics, activePartitionHostMap.get(hostInfo), 
topologyName);
+                        
storesOnHostForTopologyName.addAll(getStoresOnHost(storeToSourceTopics, 
standbyPartitionHostMap.get(hostInfo), topologyName));
+                        namedTopologyToStoreName.put(topologyName, 
storesOnHostForTopologyName);
+                    });
+
+                    metadata = new NamedTopologyStreamsMetadataImpl(
+                        hostInfo,
+                        activeStoresOnHost,
+                        activePartitionsOnHost,
+                        standbyStoresOnHost,
+                        standbyPartitionsOnHost,
+                        namedTopologyToStoreName);
+                } else {
+                    metadata = new StreamsMetadataImpl(
+                        hostInfo,
+                        activeStoresOnHost,
+                        activePartitionsOnHost,
+                        standbyStoresOnHost,
+                        standbyPartitionsOnHost
+                    );
+                }
                 rebuiltMetadata.add(metadata);
                 if (hostInfo.equals(thisHost)) {
                     localMetadata.set(metadata);

Reply via email to