guozhangwang commented on a change in pull request #11609: URL: https://github.com/apache/kafka/pull/11609#discussion_r788356598
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ########## @@ -428,7 +469,7 @@ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePart final Set<HostInfo> standbyHosts = new HashSet<>(); for (final StreamsMetadata streamsMetadata : allMetadata) { if (streamsMetadata instanceof NamedTopologyStreamsMetadataImpl - && ((NamedTopologyStreamsMetadataImpl) streamsMetadata).namedTopologyToStoreNames().get(topologyName).contains(storeName)) { + && ((NamedTopologyStreamsMetadataImpl) streamsMetadata).storeNamesByTopology().get(topologyName).contains(storeName)) { Review comment: Hmm.. this seems not right to me. Below we want to collect both active and standby host infos in the `KeyQueryMetadata`, but here we would only enter if the active store names contains it. If that should be fixed, then it seems that `storeNamesByTopology()` and `standbyStoreNamesByTopology()` always appears hand-in-hand, hence I'm wondering if it's necessary to keep them in two fields and two getter functions? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ########## @@ -428,7 +469,7 @@ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePart final Set<HostInfo> standbyHosts = new HashSet<>(); for (final StreamsMetadata streamsMetadata : allMetadata) { if (streamsMetadata instanceof NamedTopologyStreamsMetadataImpl - && ((NamedTopologyStreamsMetadataImpl) streamsMetadata).namedTopologyToStoreNames().get(topologyName).contains(storeName)) { + && ((NamedTopologyStreamsMetadataImpl) streamsMetadata).storeNamesByTopology().get(topologyName).contains(storeName)) { Review comment: This comment is the lines 473 - 476 below: should we use the `topologyName` as well? Otherwise, e.g. ``` final Set<String> standbyStateStoreNames = streamsMetadata.standbyStateStoreNames(); ``` Would be getting for the `UNNAMED` topology, which seems not correct? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ########## @@ -324,6 +345,45 @@ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePart return; } + allMetadata = topologyMetadata.hasNamedTopologies() ? + rebuildMetadataForNamedTopologies(activePartitionHostMap, standbyPartitionHostMap) : + rebuildMetadataForSingleTopology(activePartitionHostMap, standbyPartitionHostMap); + } + + private List<StreamsMetadata> rebuildMetadataForNamedTopologies(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap, + final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) { + final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>(); + Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream()) + .distinct() + .forEach(hostInfo -> { + final Set<TopicPartition> activePartitionsOnHost = new HashSet<>(); + final Set<TopicPartition> standbyPartitionsOnHost = new HashSet<>(); + + final Map<String, Set<String>> storeNamesByTopology = new HashMap<>(); + final Map<String, Set<String>> standbyStoreNamesByTopology = new HashMap<>(); + + topologyMetadata.namedTopologiesView().forEach(topologyName -> { + // WIP Review comment: ?? Still WIP? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ########## @@ -113,7 +113,7 @@ public StreamsMetadata getLocalMetadata() { return allMetadata; } - final Collection<String> sourceTopics = topologyMetadata.sourceTopicsForStore(storeName); + final Collection<String> sourceTopics = topologyMetadata.sourceTopicsForStore(storeName, null); Review comment: Before this change, `StreamsMetadataState#getAllMetadataForStore(string)` returns the metadata for the store names across all topologies (it's possible that multiple topologies have stores with the same name), whereas now it is only returning for the `UNNAMED_TOPOLOGY`, is this change intentional? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ########## @@ -296,9 +316,10 @@ private boolean hasPartitionsForAnyTopics(final List<String> topicNames, final S } private Set<String> getStoresOnHost(final Map<String, List<String>> storeToSourceTopics, - final Set<TopicPartition> sourceTopicPartitions, final String topologyName) { + final Set<TopicPartition> sourceTopicPartitions, + final String topologyName) { final InternalTopologyBuilder builder = topologyMetadata.lookupBuilderForNamedTopology(topologyName); - final Set<String> sourceTopicNames = builder.sourceTopicNames(); + final Collection<String> sourceTopicNames = builder.fullSourceTopicNames(); Review comment: Just to clarify this is the actual fix right? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ########## @@ -324,6 +345,45 @@ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePart return; } + allMetadata = topologyMetadata.hasNamedTopologies() ? + rebuildMetadataForNamedTopologies(activePartitionHostMap, standbyPartitionHostMap) : + rebuildMetadataForSingleTopology(activePartitionHostMap, standbyPartitionHostMap); + } + + private List<StreamsMetadata> rebuildMetadataForNamedTopologies(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap, + final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) { + final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>(); + Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream()) + .distinct() + .forEach(hostInfo -> { + final Set<TopicPartition> activePartitionsOnHost = new HashSet<>(); + final Set<TopicPartition> standbyPartitionsOnHost = new HashSet<>(); + + final Map<String, Set<String>> storeNamesByTopology = new HashMap<>(); + final Map<String, Set<String>> standbyStoreNamesByTopology = new HashMap<>(); + + topologyMetadata.namedTopologiesView().forEach(topologyName -> { + // WIP + }); + + final NamedTopologyStreamsMetadataImpl metadata = new NamedTopologyStreamsMetadataImpl( + hostInfo, + storeNamesByTopology, Review comment: Since this PR is not completed yet (btw could you rename it to indicate it's WIP?), I'm not sure what's the motivation from changing it to a mapping from topology to names, but I feel we may not need to maintain two mappings for active and standby since we are always trying to check if store name is in either of them, is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org