This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 513e1c641d6 KAFKA-14539: Simplify StreamsMetadataState by replacing
the Cluster metadata with partition info map (#13751)
513e1c641d6 is described below
commit 513e1c641d63c5e15144f9fcdafa1b56c5e5ba09
Author: Danica Fine <[email protected]>
AuthorDate: Wed Jun 7 21:35:11 2023 +0200
KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster
metadata with partition info map (#13751)
Replace usage of Cluster in StreamsMetadataState with Map<String, List>.
Update StreamsPartitionAssignor#onAssignment method to pass existing
Map<TopicPartition, PartitionInfo> instead of fake Cluster object.
Behavior remains the same; updated existing unit tests accordingly.
Reviewers: Walker Carlson <[email protected]>, Bill Bejeck
<[email protected]>
---
.../processor/internals/StreamsMetadataState.java | 23 +++++++-----
.../internals/StreamsPartitionAssignor.java | 3 +-
.../internals/StreamsMetadataStateTest.java | 41 ++++++++++------------
.../internals/StreamsPartitionAssignorTest.java | 11 +++---
4 files changed, 40 insertions(+), 38 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 49fa34bc510..41f93c88ed5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -22,7 +22,6 @@ import java.util.function.Function;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
@@ -40,6 +39,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -59,7 +59,7 @@ public class StreamsMetadataState {
private final Set<String> globalStores;
private final HostInfo thisHost;
private List<StreamsMetadata> allMetadata = Collections.emptyList();
- private Cluster clusterMetadata;
+ private Map<String, List<PartitionInfo>> partitionsByTopic;
private final AtomicReference<StreamsMetadata> localMetadata = new
AtomicReference<>(null);
public StreamsMetadataState(final TopologyMetadata topologyMetadata,
@@ -81,7 +81,7 @@ public class StreamsMetadataState {
builder.append(indent).append("GlobalMetadata:
").append(allMetadata).append("\n");
builder.append(indent).append("GlobalStores:
").append(globalStores).append("\n");
builder.append(indent).append("My HostInfo:
").append(thisHost).append("\n");
- builder.append(indent).append(clusterMetadata).append("\n");
+ builder.append(indent).append("PartitionsByTopic:
").append(partitionsByTopic).append("\n");
return builder.toString();
}
@@ -308,12 +308,17 @@ public class StreamsMetadataState {
*
* @param activePartitionHostMap the current mapping of {@link HostInfo}
-> {@link TopicPartition}s for active partitions
* @param standbyPartitionHostMap the current mapping of {@link HostInfo}
-> {@link TopicPartition}s for standby partitions
- * @param clusterMetadata the current clusterMetadata {@link
Cluster}
+ * @param topicPartitionInfo the current mapping of {@link
TopicPartition} -> {@Link PartitionInfo}
*/
synchronized void onChange(final Map<HostInfo, Set<TopicPartition>>
activePartitionHostMap,
final Map<HostInfo, Set<TopicPartition>>
standbyPartitionHostMap,
- final Cluster clusterMetadata) {
- this.clusterMetadata = clusterMetadata;
+ final Map<TopicPartition, PartitionInfo>
topicPartitionInfo) {
+ this.partitionsByTopic = new HashMap<>();
+ topicPartitionInfo.entrySet().forEach(entry -> this.partitionsByTopic
+ .computeIfAbsent(entry.getKey().topic(), topic -> new
ArrayList<>())
+ .add(entry.getValue())
+ );
+
rebuildMetadata(activePartitionHostMap, standbyPartitionHostMap);
}
@@ -558,7 +563,7 @@ public class StreamsMetadataState {
}
private boolean isInitialized() {
- return clusterMetadata != null && !clusterMetadata.topics().isEmpty()
&& localMetadata.get() != null;
+ return partitionsByTopic != null && !partitionsByTopic.isEmpty() &&
localMetadata.get() != null;
}
public String getStoreForChangelogTopic(final String topicName) {
@@ -573,10 +578,10 @@ public class StreamsMetadataState {
private SourceTopicsInfo(final List<String> sourceTopics) {
this.sourceTopics = sourceTopics;
for (final String topic : sourceTopics) {
- final List<PartitionInfo> partitions =
clusterMetadata.partitionsForTopic(topic);
+ final List<PartitionInfo> partitions =
partitionsByTopic.getOrDefault(topic, Collections.emptyList());
if (partitions.size() > maxPartitions) {
maxPartitions = partitions.size();
- topicWithMostPartitions = partitions.get(0).topic();
+ topicWithMostPartitions = topic;
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 54ab6e8fa0e..64f80f7963f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -1344,8 +1344,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
partitionsByHost.keySet()
);
- final Cluster fakeCluster =
Cluster.empty().withPartitions(topicToPartitionInfo);
- streamsMetadataState.onChange(partitionsByHost,
standbyPartitionsByHost, fakeCluster);
+ streamsMetadataState.onChange(partitionsByHost,
standbyPartitionsByHost, topicToPartitionInfo);
// we do not capture any exceptions but just let the exception thrown
from consumer.poll directly
// since when stream thread captures it, either we close all tasks as
dirty or we close thread
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 2343ece6702..8113b41ca93 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
@@ -37,12 +35,10 @@ import
org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
import org.junit.Before;
import org.junit.Test;
-import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
@@ -73,7 +69,7 @@ public class StreamsMetadataStateTest {
private TopicPartition topic1P1;
private TopicPartition topic2P1;
private TopicPartition topic4P0;
- private Cluster cluster;
+ private Map<TopicPartition, PartitionInfo> partitionInfos;
private final String globalTable = "global-table";
private final LogContext logContext = new LogContext(String.format("test
[%s] ", "StreamsMetadataStateTest"));
private StreamPartitioner<String, Object> partitioner;
@@ -121,19 +117,18 @@ public class StreamsMetadataStateTest {
hostToStandbyPartitions.put(hostOne, mkSet(topic2P0, topic1P1));
hostToStandbyPartitions.put(hostTwo, Collections.singleton(topic3P0));
- final List<PartitionInfo> partitionInfos = Arrays.asList(
- new PartitionInfo("topic-one", 0, null, null, null),
- new PartitionInfo("topic-one", 1, null, null, null),
- new PartitionInfo("topic-two", 0, null, null, null),
- new PartitionInfo("topic-two", 1, null, null, null),
- new PartitionInfo("topic-three", 0, null, null, null),
- new PartitionInfo("topic-four", 0, null, null, null));
+ partitionInfos = new HashMap<>();
+ partitionInfos.put(new TopicPartition("topic-one", 0), new
PartitionInfo("topic-one", 0, null, null, null));
+ partitionInfos.put(new TopicPartition("topic-one", 1), new
PartitionInfo("topic-one", 1, null, null, null));
+ partitionInfos.put(new TopicPartition("topic-two", 0), new
PartitionInfo("topic-two", 0, null, null, null));
+ partitionInfos.put(new TopicPartition("topic-two", 1), new
PartitionInfo("topic-two", 1, null, null, null));
+ partitionInfos.put(new TopicPartition("topic-three", 0), new
PartitionInfo("topic-three", 0, null, null, null));
+ partitionInfos.put(new TopicPartition("topic-four", 0), new
PartitionInfo("topic-four", 0, null, null, null));
- cluster = new Cluster(null, Collections.<Node>emptyList(),
partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
final TopologyMetadata topologyMetadata = new
TopologyMetadata(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
new DummyStreamsConfig());
topologyMetadata.buildAndRewriteTopology();
metadataState = new StreamsMetadataState(topologyMetadata, hostOne,
logContext);
- metadataState.onChange(hostToActivePartitions,
hostToStandbyPartitions, cluster);
+ metadataState.onChange(hostToActivePartitions,
hostToStandbyPartitions, partitionInfos);
partitioner = (topic, key, value, numPartitions) -> 1;
storeNames = mkSet("table-one", "table-two", "merged-table",
globalTable);
}
@@ -199,7 +194,7 @@ public class StreamsMetadataStateTest {
hostToActivePartitions.put(hostFour, mkSet(tp5));
metadataState.onChange(hostToActivePartitions, Collections.emptyMap(),
- cluster.withPartitions(Collections.singletonMap(tp5, new
PartitionInfo("topic-five", 1, null, null, null))));
+ Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1,
null, null, null)));
final StreamsMetadata expected = new StreamsMetadataImpl(hostFour,
Collections.singleton(globalTable),
Collections.singleton(tp5), Collections.emptySet(),
Collections.emptySet());
@@ -246,7 +241,7 @@ public class StreamsMetadataStateTest {
hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
- cluster.withPartitions(Collections.singletonMap(tp4, new
PartitionInfo("topic-three", 1, null, null, null))));
+ Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1,
null, null, null)));
final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree,
mkSet(hostTwo), 0);
final KeyQueryMetadata actual =
metadataState.getKeyQueryMetadataForKey("table-three",
@@ -261,7 +256,7 @@ public class StreamsMetadataStateTest {
hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
- cluster.withPartitions(Collections.singletonMap(tp4, new
PartitionInfo("topic-three", 1, null, null, null))));
+ Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1,
null, null, null)));
final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo,
Collections.emptySet(), 1);
@@ -278,7 +273,7 @@ public class StreamsMetadataStateTest {
hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
- cluster.withPartitions(Collections.singletonMap(tp4, new
PartitionInfo("topic-three", 1, null, null, null))));
+ Collections.singletonMap(tp4, new PartitionInfo("topic-three",
1, null, null, null)));
assertThrows(IllegalArgumentException.class, () ->
metadataState.getKeyQueryMetadataForKey("table-three",
@@ -288,7 +283,7 @@ public class StreamsMetadataStateTest {
@Test
public void shouldReturnNotAvailableWhenClusterIsEmpty() {
- metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(),
Cluster.empty());
+ metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap());
final KeyQueryMetadata result =
metadataState.getKeyQueryMetadataForKey("table-one", "a",
Serdes.String().serializer());
assertEquals(KeyQueryMetadata.NOT_AVAILABLE, result);
}
@@ -299,7 +294,7 @@ public class StreamsMetadataStateTest {
hostToActivePartitions.put(hostTwo, mkSet(topic2P0, topic1P1,
topic2P2));
hostToStandbyPartitions.put(hostOne, mkSet(topic2P0, topic1P1,
topic2P2));
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
- cluster.withPartitions(Collections.singletonMap(topic2P2, new
PartitionInfo("topic-two", 2, null, null, null))));
+ Collections.singletonMap(topic2P2, new
PartitionInfo("topic-two", 2, null, null, null)));
final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo,
mkSet(hostOne), 2);
@@ -371,7 +366,7 @@ public class StreamsMetadataStateTest {
StreamsMetadataState.UNKNOWN_HOST,
logContext
);
- streamsMetadataState.onChange(hostToActivePartitions,
hostToStandbyPartitions, cluster);
+ streamsMetadataState.onChange(hostToActivePartitions,
hostToStandbyPartitions, partitionInfos);
assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable,
"key", Serdes.String().serializer()));
}
@@ -389,7 +384,7 @@ public class StreamsMetadataStateTest {
StreamsMetadataState.UNKNOWN_HOST,
logContext
);
- streamsMetadataState.onChange(hostToActivePartitions,
hostToStandbyPartitions, cluster);
+ streamsMetadataState.onChange(hostToActivePartitions,
hostToStandbyPartitions, partitionInfos);
assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable,
"key", partitioner));
}
@@ -398,7 +393,7 @@ public class StreamsMetadataStateTest {
final Collection<StreamsMetadata> allMetadata =
metadataState.getAllMetadata();
final Collection<StreamsMetadata> copy = new ArrayList<>(allMetadata);
assertFalse("invalid test", allMetadata.isEmpty());
- metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(),
cluster);
+ metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(),
partitionInfos);
assertEquals("encapsulation broken", allMetadata, copy);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index a03970811eb..12748254d18 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -75,6 +75,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
@@ -207,6 +208,8 @@ public class StreamsPartitionAssignorTest {
private TopologyMetadata topologyMetadata;
@Mock
private StreamsMetadataState streamsMetadataState;
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, PartitionInfo>>
topicPartitionInfoCaptor;
private final Map<String, Subscription> subscriptions = new HashMap<>();
private final Class<? extends TaskAssignor> taskAssignor;
private Map<String, String> clientTags;
@@ -1182,7 +1185,6 @@ public class StreamsPartitionAssignorTest {
standbyTasks.put(TASK_0_2, mkSet(t3p2));
streamsMetadataState = mock(StreamsMetadataState.class);
- final ArgumentCaptor<Cluster> capturedCluster =
ArgumentCaptor.forClass(Cluster.class);
configureDefaultPartitionAssignor();
@@ -1192,11 +1194,12 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.onAssignment(assignment, null);
- verify(streamsMetadataState).onChange(eq(hostState), any(),
capturedCluster.capture());
+ verify(streamsMetadataState).onChange(eq(hostState), any(),
topicPartitionInfoCaptor.capture());
verify(taskManager).handleAssignment(activeTasks, standbyTasks);
- assertEquals(singleton(t3p0.topic()),
capturedCluster.getValue().topics());
- assertEquals(2,
capturedCluster.getValue().partitionsForTopic(t3p0.topic()).size());
+ assertTrue(topicPartitionInfoCaptor.getValue().containsKey(t3p0));
+ assertTrue(topicPartitionInfoCaptor.getValue().containsKey(t3p3));
+ assertEquals(2, topicPartitionInfoCaptor.getValue().size());
}
@Test