guozhangwang commented on a change in pull request #11609:
URL: https://github.com/apache/kafka/pull/11609#discussion_r788356598



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -428,7 +469,7 @@ private void rebuildMetadata(final Map<HostInfo, 
Set<TopicPartition>> activePart
         final Set<HostInfo> standbyHosts = new HashSet<>();
         for (final StreamsMetadata streamsMetadata : allMetadata) {
             if (streamsMetadata instanceof NamedTopologyStreamsMetadataImpl
-                && ((NamedTopologyStreamsMetadataImpl) 
streamsMetadata).namedTopologyToStoreNames().get(topologyName).contains(storeName))
 {
+                && ((NamedTopologyStreamsMetadataImpl) 
streamsMetadata).storeNamesByTopology().get(topologyName).contains(storeName)) {

Review comment:
       Hmm.. this seems not right to me. Below we want to collect both active 
and standby host infos in the `KeyQueryMetadata`, but here we would only enter 
if the active store names contains it.
   
   If that should be fixed, then it seems that `storeNamesByTopology()` and 
`standbyStoreNamesByTopology()` always appears hand-in-hand, hence I'm 
wondering if it's necessary to keep them in two fields and two getter functions?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -428,7 +469,7 @@ private void rebuildMetadata(final Map<HostInfo, 
Set<TopicPartition>> activePart
         final Set<HostInfo> standbyHosts = new HashSet<>();
         for (final StreamsMetadata streamsMetadata : allMetadata) {
             if (streamsMetadata instanceof NamedTopologyStreamsMetadataImpl
-                && ((NamedTopologyStreamsMetadataImpl) 
streamsMetadata).namedTopologyToStoreNames().get(topologyName).contains(storeName))
 {
+                && ((NamedTopologyStreamsMetadataImpl) 
streamsMetadata).storeNamesByTopology().get(topologyName).contains(storeName)) {

Review comment:
       This comment is the lines 473 - 476 below: should we use the 
`topologyName` as well? Otherwise, e.g. 
   
   ```
   final Set<String> standbyStateStoreNames = 
streamsMetadata.standbyStateStoreNames();
   ```
   
   Would be getting for the `UNNAMED` topology, which seems not correct?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -324,6 +345,45 @@ private void rebuildMetadata(final Map<HostInfo, 
Set<TopicPartition>> activePart
             return;
         }
 
+        allMetadata = topologyMetadata.hasNamedTopologies() ?
+            rebuildMetadataForNamedTopologies(activePartitionHostMap, 
standbyPartitionHostMap) :
+            rebuildMetadataForSingleTopology(activePartitionHostMap, 
standbyPartitionHostMap);
+    }
+
+    private List<StreamsMetadata> rebuildMetadataForNamedTopologies(final 
Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
+                                                                    final 
Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
+        final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>();
+        Stream.concat(activePartitionHostMap.keySet().stream(), 
standbyPartitionHostMap.keySet().stream())
+            .distinct()
+            .forEach(hostInfo -> {
+                final Set<TopicPartition> activePartitionsOnHost = new 
HashSet<>();
+                final Set<TopicPartition> standbyPartitionsOnHost = new 
HashSet<>();
+
+                final Map<String, Set<String>> storeNamesByTopology = new 
HashMap<>();
+                final Map<String, Set<String>> standbyStoreNamesByTopology = 
new HashMap<>();
+
+                topologyMetadata.namedTopologiesView().forEach(topologyName -> 
{
+                    // WIP 

Review comment:
       ?? Still WIP?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -113,7 +113,7 @@ public StreamsMetadata getLocalMetadata() {
             return allMetadata;
         }
 
-        final Collection<String> sourceTopics = 
topologyMetadata.sourceTopicsForStore(storeName);
+        final Collection<String> sourceTopics = 
topologyMetadata.sourceTopicsForStore(storeName, null);

Review comment:
       Before this change, 
`StreamsMetadataState#getAllMetadataForStore(string)` returns the metadata for 
the store names across all topologies (it's possible that multiple topologies 
have stores with the same name), whereas now it is only returning for the 
`UNNAMED_TOPOLOGY`, is this change intentional?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -296,9 +316,10 @@ private boolean hasPartitionsForAnyTopics(final 
List<String> topicNames, final S
     }
 
     private Set<String> getStoresOnHost(final Map<String, List<String>> 
storeToSourceTopics,
-        final Set<TopicPartition> sourceTopicPartitions, final String 
topologyName) {
+                                        final Set<TopicPartition> 
sourceTopicPartitions,
+                                        final String topologyName) {
         final InternalTopologyBuilder builder = 
topologyMetadata.lookupBuilderForNamedTopology(topologyName);
-        final Set<String> sourceTopicNames = builder.sourceTopicNames();
+        final Collection<String> sourceTopicNames = 
builder.fullSourceTopicNames();

Review comment:
       Just to clarify this is the actual fix right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -324,6 +345,45 @@ private void rebuildMetadata(final Map<HostInfo, 
Set<TopicPartition>> activePart
             return;
         }
 
+        allMetadata = topologyMetadata.hasNamedTopologies() ?
+            rebuildMetadataForNamedTopologies(activePartitionHostMap, 
standbyPartitionHostMap) :
+            rebuildMetadataForSingleTopology(activePartitionHostMap, 
standbyPartitionHostMap);
+    }
+
+    private List<StreamsMetadata> rebuildMetadataForNamedTopologies(final 
Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
+                                                                    final 
Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
+        final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>();
+        Stream.concat(activePartitionHostMap.keySet().stream(), 
standbyPartitionHostMap.keySet().stream())
+            .distinct()
+            .forEach(hostInfo -> {
+                final Set<TopicPartition> activePartitionsOnHost = new 
HashSet<>();
+                final Set<TopicPartition> standbyPartitionsOnHost = new 
HashSet<>();
+
+                final Map<String, Set<String>> storeNamesByTopology = new 
HashMap<>();
+                final Map<String, Set<String>> standbyStoreNamesByTopology = 
new HashMap<>();
+
+                topologyMetadata.namedTopologiesView().forEach(topologyName -> 
{
+                    // WIP 
+                });
+
+                final NamedTopologyStreamsMetadataImpl metadata = new 
NamedTopologyStreamsMetadataImpl(
+                    hostInfo,
+                    storeNamesByTopology,

Review comment:
       Since this PR is not completed yet (btw could you rename it to indicate 
it's WIP?), I'm not sure what's the motivation from changing it to a mapping 
from topology to names, but I feel we may not need to maintain two mappings for 
active and standby since we are always trying to check if store name is in 
either of them, is that right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to