This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 513e1c641d6 KAFKA-14539: Simplify StreamsMetadataState by replacing 
the Cluster metadata with partition info map (#13751)
513e1c641d6 is described below

commit 513e1c641d63c5e15144f9fcdafa1b56c5e5ba09
Author: Danica Fine <[email protected]>
AuthorDate: Wed Jun 7 21:35:11 2023 +0200

    KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster 
metadata with partition info map (#13751)
    
    Replace usage of Cluster in StreamsMetadataState with Map<String, List>. 
Update StreamsPartitionAssignor#onAssignment method to pass existing 
Map<TopicPartition, PartitionInfo> instead of fake Cluster object.
    
    Behavior remains the same; updated existing unit tests accordingly.
    
    Reviewers:  Walker Carlson <[email protected]>, Bill Bejeck 
<[email protected]>
---
 .../processor/internals/StreamsMetadataState.java  | 23 +++++++-----
 .../internals/StreamsPartitionAssignor.java        |  3 +-
 .../internals/StreamsMetadataStateTest.java        | 41 ++++++++++------------
 .../internals/StreamsPartitionAssignorTest.java    | 11 +++---
 4 files changed, 40 insertions(+), 38 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 49fa34bc510..41f93c88ed5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -22,7 +22,6 @@ import java.util.function.Function;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
@@ -40,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -59,7 +59,7 @@ public class StreamsMetadataState {
     private final Set<String> globalStores;
     private final HostInfo thisHost;
     private List<StreamsMetadata> allMetadata = Collections.emptyList();
-    private Cluster clusterMetadata;
+    private Map<String, List<PartitionInfo>> partitionsByTopic;
     private final AtomicReference<StreamsMetadata> localMetadata = new 
AtomicReference<>(null);
 
     public StreamsMetadataState(final TopologyMetadata topologyMetadata,
@@ -81,7 +81,7 @@ public class StreamsMetadataState {
         builder.append(indent).append("GlobalMetadata: 
").append(allMetadata).append("\n");
         builder.append(indent).append("GlobalStores: 
").append(globalStores).append("\n");
         builder.append(indent).append("My HostInfo: 
").append(thisHost).append("\n");
-        builder.append(indent).append(clusterMetadata).append("\n");
+        builder.append(indent).append("PartitionsByTopic: 
").append(partitionsByTopic).append("\n");
 
         return builder.toString();
     }
@@ -308,12 +308,17 @@ public class StreamsMetadataState {
      *
      * @param activePartitionHostMap  the current mapping of {@link HostInfo} 
-> {@link TopicPartition}s for active partitions
      * @param standbyPartitionHostMap the current mapping of {@link HostInfo} 
-> {@link TopicPartition}s for standby partitions
-     * @param clusterMetadata         the current clusterMetadata {@link 
Cluster}
+     * @param topicPartitionInfo      the current mapping of {@link 
TopicPartition} -> {@Link PartitionInfo}
      */
     synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> 
activePartitionHostMap,
                                final Map<HostInfo, Set<TopicPartition>> 
standbyPartitionHostMap,
-                               final Cluster clusterMetadata) {
-        this.clusterMetadata = clusterMetadata;
+                               final Map<TopicPartition, PartitionInfo> 
topicPartitionInfo) {
+        this.partitionsByTopic = new HashMap<>();
+        topicPartitionInfo.entrySet().forEach(entry -> this.partitionsByTopic
+                .computeIfAbsent(entry.getKey().topic(), topic -> new 
ArrayList<>())
+                .add(entry.getValue())
+        );
+
         rebuildMetadata(activePartitionHostMap, standbyPartitionHostMap);
     }
 
@@ -558,7 +563,7 @@ public class StreamsMetadataState {
     }
 
     private boolean isInitialized() {
-        return clusterMetadata != null && !clusterMetadata.topics().isEmpty() 
&& localMetadata.get() != null;
+        return partitionsByTopic != null && !partitionsByTopic.isEmpty() && 
localMetadata.get() != null;
     }
 
     public String getStoreForChangelogTopic(final String topicName) {
@@ -573,10 +578,10 @@ public class StreamsMetadataState {
         private SourceTopicsInfo(final List<String> sourceTopics) {
             this.sourceTopics = sourceTopics;
             for (final String topic : sourceTopics) {
-                final List<PartitionInfo> partitions = 
clusterMetadata.partitionsForTopic(topic);
+                final List<PartitionInfo> partitions = 
partitionsByTopic.getOrDefault(topic, Collections.emptyList());
                 if (partitions.size() > maxPartitions) {
                     maxPartitions = partitions.size();
-                    topicWithMostPartitions = partitions.get(0).topic();
+                    topicWithMostPartitions = topic;
                 }
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 54ab6e8fa0e..64f80f7963f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -1344,8 +1344,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             partitionsByHost.keySet()
         );
 
-        final Cluster fakeCluster = 
Cluster.empty().withPartitions(topicToPartitionInfo);
-        streamsMetadataState.onChange(partitionsByHost, 
standbyPartitionsByHost, fakeCluster);
+        streamsMetadataState.onChange(partitionsByHost, 
standbyPartitionsByHost, topicToPartitionInfo);
 
         // we do not capture any exceptions but just let the exception thrown 
from consumer.poll directly
         // since when stream thread captures it, either we close all tasks as 
dirty or we close thread
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 2343ece6702..8113b41ca93 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serdes;
@@ -37,12 +35,10 @@ import 
org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Optional;
@@ -73,7 +69,7 @@ public class StreamsMetadataStateTest {
     private TopicPartition topic1P1;
     private TopicPartition topic2P1;
     private TopicPartition topic4P0;
-    private Cluster cluster;
+    private Map<TopicPartition, PartitionInfo> partitionInfos;
     private final String globalTable = "global-table";
     private final LogContext logContext = new LogContext(String.format("test 
[%s] ", "StreamsMetadataStateTest"));
     private StreamPartitioner<String, Object> partitioner;
@@ -121,19 +117,18 @@ public class StreamsMetadataStateTest {
         hostToStandbyPartitions.put(hostOne, mkSet(topic2P0, topic1P1));
         hostToStandbyPartitions.put(hostTwo, Collections.singleton(topic3P0));
 
-        final List<PartitionInfo> partitionInfos = Arrays.asList(
-                new PartitionInfo("topic-one", 0, null, null, null),
-                new PartitionInfo("topic-one", 1, null, null, null),
-                new PartitionInfo("topic-two", 0, null, null, null),
-                new PartitionInfo("topic-two", 1, null, null, null),
-                new PartitionInfo("topic-three", 0, null, null, null),
-                new PartitionInfo("topic-four", 0, null, null, null));
+        partitionInfos = new HashMap<>();
+        partitionInfos.put(new TopicPartition("topic-one", 0), new 
PartitionInfo("topic-one", 0, null, null, null));
+        partitionInfos.put(new TopicPartition("topic-one", 1), new 
PartitionInfo("topic-one", 1, null, null, null));
+        partitionInfos.put(new TopicPartition("topic-two", 0), new 
PartitionInfo("topic-two", 0, null, null, null));
+        partitionInfos.put(new TopicPartition("topic-two", 1), new 
PartitionInfo("topic-two", 1, null, null, null));
+        partitionInfos.put(new TopicPartition("topic-three", 0), new 
PartitionInfo("topic-three", 0, null, null, null));
+        partitionInfos.put(new TopicPartition("topic-four", 0), new 
PartitionInfo("topic-four", 0, null, null, null));
 
-        cluster = new Cluster(null, Collections.<Node>emptyList(), 
partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(TopologyWrapper.getInternalTopologyBuilder(builder.build()), 
new DummyStreamsConfig());
         topologyMetadata.buildAndRewriteTopology();
         metadataState = new StreamsMetadataState(topologyMetadata, hostOne, 
logContext);
-        metadataState.onChange(hostToActivePartitions, 
hostToStandbyPartitions, cluster);
+        metadataState.onChange(hostToActivePartitions, 
hostToStandbyPartitions, partitionInfos);
         partitioner = (topic, key, value, numPartitions) -> 1;
         storeNames = mkSet("table-one", "table-two", "merged-table", 
globalTable);
     }
@@ -199,7 +194,7 @@ public class StreamsMetadataStateTest {
         hostToActivePartitions.put(hostFour, mkSet(tp5));
 
         metadataState.onChange(hostToActivePartitions, Collections.emptyMap(),
-            cluster.withPartitions(Collections.singletonMap(tp5, new 
PartitionInfo("topic-five", 1, null, null, null))));
+            Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, 
null, null, null)));
 
         final StreamsMetadata expected = new StreamsMetadataImpl(hostFour, 
Collections.singleton(globalTable),
                 Collections.singleton(tp5), Collections.emptySet(), 
Collections.emptySet());
@@ -246,7 +241,7 @@ public class StreamsMetadataStateTest {
         hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
 
         metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
-            cluster.withPartitions(Collections.singletonMap(tp4, new 
PartitionInfo("topic-three", 1, null, null, null))));
+            Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, 
null, null, null)));
 
         final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, 
mkSet(hostTwo), 0);
         final KeyQueryMetadata actual = 
metadataState.getKeyQueryMetadataForKey("table-three",
@@ -261,7 +256,7 @@ public class StreamsMetadataStateTest {
         hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
 
         metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
-            cluster.withPartitions(Collections.singletonMap(tp4, new 
PartitionInfo("topic-three", 1, null, null, null))));
+            Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, 
null, null, null)));
 
         final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, 
Collections.emptySet(), 1);
 
@@ -278,7 +273,7 @@ public class StreamsMetadataStateTest {
         hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
 
         metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
-                cluster.withPartitions(Collections.singletonMap(tp4, new 
PartitionInfo("topic-three", 1, null, null, null))));
+                Collections.singletonMap(tp4, new PartitionInfo("topic-three", 
1, null, null, null)));
 
 
         assertThrows(IllegalArgumentException.class, () -> 
metadataState.getKeyQueryMetadataForKey("table-three",
@@ -288,7 +283,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldReturnNotAvailableWhenClusterIsEmpty() {
-        metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), 
Cluster.empty());
+        metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap());
         final KeyQueryMetadata result = 
metadataState.getKeyQueryMetadataForKey("table-one", "a", 
Serdes.String().serializer());
         assertEquals(KeyQueryMetadata.NOT_AVAILABLE, result);
     }
@@ -299,7 +294,7 @@ public class StreamsMetadataStateTest {
         hostToActivePartitions.put(hostTwo, mkSet(topic2P0, topic1P1, 
topic2P2));
         hostToStandbyPartitions.put(hostOne, mkSet(topic2P0, topic1P1, 
topic2P2));
         metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
-                cluster.withPartitions(Collections.singletonMap(topic2P2, new 
PartitionInfo("topic-two", 2, null, null, null))));
+                Collections.singletonMap(topic2P2, new 
PartitionInfo("topic-two", 2, null, null, null)));
 
         final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, 
mkSet(hostOne), 2);
 
@@ -371,7 +366,7 @@ public class StreamsMetadataStateTest {
             StreamsMetadataState.UNKNOWN_HOST,
             logContext
         );
-        streamsMetadataState.onChange(hostToActivePartitions, 
hostToStandbyPartitions, cluster);
+        streamsMetadataState.onChange(hostToActivePartitions, 
hostToStandbyPartitions, partitionInfos);
         
assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, 
"key", Serdes.String().serializer()));
     }
 
@@ -389,7 +384,7 @@ public class StreamsMetadataStateTest {
             StreamsMetadataState.UNKNOWN_HOST,
             logContext
         );
-        streamsMetadataState.onChange(hostToActivePartitions, 
hostToStandbyPartitions, cluster);
+        streamsMetadataState.onChange(hostToActivePartitions, 
hostToStandbyPartitions, partitionInfos);
         
assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, 
"key", partitioner));
     }
 
@@ -398,7 +393,7 @@ public class StreamsMetadataStateTest {
         final Collection<StreamsMetadata> allMetadata = 
metadataState.getAllMetadata();
         final Collection<StreamsMetadata> copy = new ArrayList<>(allMetadata);
         assertFalse("invalid test", allMetadata.isEmpty());
-        metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), 
cluster);
+        metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), 
partitionInfos);
         assertEquals("encapsulation broken", allMetadata, copy);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index a03970811eb..12748254d18 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -75,6 +75,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
@@ -207,6 +208,8 @@ public class StreamsPartitionAssignorTest {
     private TopologyMetadata topologyMetadata;
     @Mock
     private StreamsMetadataState streamsMetadataState;
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, PartitionInfo>> 
topicPartitionInfoCaptor;
     private final Map<String, Subscription> subscriptions = new HashMap<>();
     private final Class<? extends TaskAssignor> taskAssignor;
     private Map<String, String> clientTags;
@@ -1182,7 +1185,6 @@ public class StreamsPartitionAssignorTest {
         standbyTasks.put(TASK_0_2, mkSet(t3p2));
 
         streamsMetadataState = mock(StreamsMetadataState.class);
-        final ArgumentCaptor<Cluster> capturedCluster = 
ArgumentCaptor.forClass(Cluster.class);
 
         configureDefaultPartitionAssignor();
 
@@ -1192,11 +1194,12 @@ public class StreamsPartitionAssignorTest {
 
         partitionAssignor.onAssignment(assignment, null);
 
-        verify(streamsMetadataState).onChange(eq(hostState), any(), 
capturedCluster.capture());
+        verify(streamsMetadataState).onChange(eq(hostState), any(), 
topicPartitionInfoCaptor.capture());
         verify(taskManager).handleAssignment(activeTasks, standbyTasks);
 
-        assertEquals(singleton(t3p0.topic()), 
capturedCluster.getValue().topics());
-        assertEquals(2, 
capturedCluster.getValue().partitionsForTopic(t3p0.topic()).size());
+        assertTrue(topicPartitionInfoCaptor.getValue().containsKey(t3p0));
+        assertTrue(topicPartitionInfoCaptor.getValue().containsKey(t3p3));
+        assertEquals(2, topicPartitionInfoCaptor.getValue().size());
     }
 
     @Test

Reply via email to