This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5eb3c1 HOTFIX: fix failing StreamsMetadataStateTest tests (#11590)
d5eb3c1 is described below
commit d5eb3c10ecd394015336868f948348c62c0e4e77
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Dec 9 16:19:56 2021 -0800
HOTFIX: fix failing StreamsMetadataStateTest tests (#11590)
Followup to #11562 to fix broken tests in StreamsMetadataStateTest
Reviewers: Walker Carlson <[email protected]>
---
.../processor/internals/StreamsMetadataState.java | 42 +++++++++++++---------
1 file changed, 26 insertions(+), 16 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 67cd436..7911c45 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -329,15 +329,6 @@ public class StreamsMetadataState {
Stream.concat(activePartitionHostMap.keySet().stream(),
standbyPartitionHostMap.keySet().stream())
.distinct()
.forEach(hostInfo -> {
- final Map<String, Collection<String>> namedTopologyToStoreName
= new HashMap<>();
- final Set<String> topologyNames =
topologyMetadata.namedTopologiesView();
- topologyNames.forEach(topologyName -> {
- final Collection<String> storesOnHostForTopologyName =
getStoresOnHost(storeToSourceTopics, activePartitionHostMap.get(hostInfo),
topologyName);
-
storesOnHostForTopologyName.addAll(getStoresOnHost(storeToSourceTopics,
standbyPartitionHostMap.get(hostInfo), topologyName));
- namedTopologyToStoreName.put(topologyName,
storesOnHostForTopologyName);
- }
- );
-
final Set<TopicPartition> activePartitionsOnHost = new
HashSet<>();
final Set<String> activeStoresOnHost = new HashSet<>();
if (activePartitionHostMap.containsKey(hostInfo)) {
@@ -353,13 +344,32 @@ public class StreamsMetadataState {
standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics,
standbyPartitionsOnHost));
}
- final StreamsMetadata metadata = new
NamedTopologyStreamsMetadataImpl(
- hostInfo,
- activeStoresOnHost,
- activePartitionsOnHost,
- standbyStoresOnHost,
- standbyPartitionsOnHost,
- namedTopologyToStoreName);
+ final StreamsMetadata metadata;
+ if (topologyMetadata.hasNamedTopologies()) {
+ final Map<String, Collection<String>>
namedTopologyToStoreName = new HashMap<>();
+ final Set<String> topologyNames =
topologyMetadata.namedTopologiesView();
+ topologyNames.forEach(topologyName -> {
+ final Collection<String> storesOnHostForTopologyName =
getStoresOnHost(storeToSourceTopics, activePartitionHostMap.get(hostInfo),
topologyName);
+
storesOnHostForTopologyName.addAll(getStoresOnHost(storeToSourceTopics,
standbyPartitionHostMap.get(hostInfo), topologyName));
+ namedTopologyToStoreName.put(topologyName,
storesOnHostForTopologyName);
+ });
+
+ metadata = new NamedTopologyStreamsMetadataImpl(
+ hostInfo,
+ activeStoresOnHost,
+ activePartitionsOnHost,
+ standbyStoresOnHost,
+ standbyPartitionsOnHost,
+ namedTopologyToStoreName);
+ } else {
+ metadata = new StreamsMetadataImpl(
+ hostInfo,
+ activeStoresOnHost,
+ activePartitionsOnHost,
+ standbyStoresOnHost,
+ standbyPartitionsOnHost
+ );
+ }
rebuiltMetadata.add(metadata);
if (hostInfo.equals(thisHost)) {
localMetadata.set(metadata);