guozhangwang commented on a change in pull request #11609:
URL: https://github.com/apache/kafka/pull/11609#discussion_r788356598
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -428,7 +469,7 @@ private void rebuildMetadata(final Map<HostInfo,
Set<TopicPartition>> activePart
final Set<HostInfo> standbyHosts = new HashSet<>();
for (final StreamsMetadata streamsMetadata : allMetadata) {
if (streamsMetadata instanceof NamedTopologyStreamsMetadataImpl
- && ((NamedTopologyStreamsMetadataImpl)
streamsMetadata).namedTopologyToStoreNames().get(topologyName).contains(storeName))
{
+ && ((NamedTopologyStreamsMetadataImpl)
streamsMetadata).storeNamesByTopology().get(topologyName).contains(storeName)) {
Review comment:
Hmm.. this seems not right to me. Below we want to collect both active
and standby host infos in the `KeyQueryMetadata`, but here we would only enter
if the active store names contains it.
If that should be fixed, then it seems that `storeNamesByTopology()` and
`standbyStoreNamesByTopology()` always appears hand-in-hand, hence I'm
wondering if it's necessary to keep them in two fields and two getter functions?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -428,7 +469,7 @@ private void rebuildMetadata(final Map<HostInfo,
Set<TopicPartition>> activePart
final Set<HostInfo> standbyHosts = new HashSet<>();
for (final StreamsMetadata streamsMetadata : allMetadata) {
if (streamsMetadata instanceof NamedTopologyStreamsMetadataImpl
- && ((NamedTopologyStreamsMetadataImpl)
streamsMetadata).namedTopologyToStoreNames().get(topologyName).contains(storeName))
{
+ && ((NamedTopologyStreamsMetadataImpl)
streamsMetadata).storeNamesByTopology().get(topologyName).contains(storeName)) {
Review comment:
This comment is the lines 473 - 476 below: should we use the
`topologyName` as well? Otherwise, e.g.
```
final Set<String> standbyStateStoreNames =
streamsMetadata.standbyStateStoreNames();
```
Would be getting for the `UNNAMED` topology, which seems not correct?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -324,6 +345,45 @@ private void rebuildMetadata(final Map<HostInfo,
Set<TopicPartition>> activePart
return;
}
+ allMetadata = topologyMetadata.hasNamedTopologies() ?
+ rebuildMetadataForNamedTopologies(activePartitionHostMap,
standbyPartitionHostMap) :
+ rebuildMetadataForSingleTopology(activePartitionHostMap,
standbyPartitionHostMap);
+ }
+
+ private List<StreamsMetadata> rebuildMetadataForNamedTopologies(final
Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
+ final
Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
+ final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>();
+ Stream.concat(activePartitionHostMap.keySet().stream(),
standbyPartitionHostMap.keySet().stream())
+ .distinct()
+ .forEach(hostInfo -> {
+ final Set<TopicPartition> activePartitionsOnHost = new
HashSet<>();
+ final Set<TopicPartition> standbyPartitionsOnHost = new
HashSet<>();
+
+ final Map<String, Set<String>> storeNamesByTopology = new
HashMap<>();
+ final Map<String, Set<String>> standbyStoreNamesByTopology =
new HashMap<>();
+
+ topologyMetadata.namedTopologiesView().forEach(topologyName ->
{
+ // WIP
Review comment:
?? Still WIP?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -113,7 +113,7 @@ public StreamsMetadata getLocalMetadata() {
return allMetadata;
}
- final Collection<String> sourceTopics =
topologyMetadata.sourceTopicsForStore(storeName);
+ final Collection<String> sourceTopics =
topologyMetadata.sourceTopicsForStore(storeName, null);
Review comment:
Before this change,
`StreamsMetadataState#getAllMetadataForStore(string)` returns the metadata for
the store names across all topologies (it's possible that multiple topologies
have stores with the same name), whereas now it is only returning for the
`UNNAMED_TOPOLOGY`, is this change intentional?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -296,9 +316,10 @@ private boolean hasPartitionsForAnyTopics(final
List<String> topicNames, final S
}
private Set<String> getStoresOnHost(final Map<String, List<String>>
storeToSourceTopics,
- final Set<TopicPartition> sourceTopicPartitions, final String
topologyName) {
+ final Set<TopicPartition>
sourceTopicPartitions,
+ final String topologyName) {
final InternalTopologyBuilder builder =
topologyMetadata.lookupBuilderForNamedTopology(topologyName);
- final Set<String> sourceTopicNames = builder.sourceTopicNames();
+ final Collection<String> sourceTopicNames =
builder.fullSourceTopicNames();
Review comment:
Just to clarify this is the actual fix right?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -324,6 +345,45 @@ private void rebuildMetadata(final Map<HostInfo,
Set<TopicPartition>> activePart
return;
}
+ allMetadata = topologyMetadata.hasNamedTopologies() ?
+ rebuildMetadataForNamedTopologies(activePartitionHostMap,
standbyPartitionHostMap) :
+ rebuildMetadataForSingleTopology(activePartitionHostMap,
standbyPartitionHostMap);
+ }
+
+ private List<StreamsMetadata> rebuildMetadataForNamedTopologies(final
Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
+ final
Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
+ final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>();
+ Stream.concat(activePartitionHostMap.keySet().stream(),
standbyPartitionHostMap.keySet().stream())
+ .distinct()
+ .forEach(hostInfo -> {
+ final Set<TopicPartition> activePartitionsOnHost = new
HashSet<>();
+ final Set<TopicPartition> standbyPartitionsOnHost = new
HashSet<>();
+
+ final Map<String, Set<String>> storeNamesByTopology = new
HashMap<>();
+ final Map<String, Set<String>> standbyStoreNamesByTopology =
new HashMap<>();
+
+ topologyMetadata.namedTopologiesView().forEach(topologyName ->
{
+ // WIP
+ });
+
+ final NamedTopologyStreamsMetadataImpl metadata = new
NamedTopologyStreamsMetadataImpl(
+ hostInfo,
+ storeNamesByTopology,
Review comment:
Since this PR is not completed yet (btw could you rename it to indicate
it's WIP?), I'm not sure what's the motivation from changing it to a mapping
from topology to names, but I feel we may not need to maintain two mappings for
active and standby since we are always trying to check if store name is in
either of them, is that right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]