This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 431cffc93f2 KAFKA-19135 Migrate initial IQ support for KIP-1071 from
feature branch to trunk (#19588)
431cffc93f2 is described below
commit 431cffc93f2a9c1a178bd4caede1001d329ab10b
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Apr 29 20:08:49 2025 -0400
KAFKA-19135 Migrate initial IQ support for KIP-1071 from feature branch to
trunk (#19588)
This PR is a migration of the initial IQ support for KIP-1071 from the
feature branch to trunk. It includes a parameterized integration test
that expects the same results whether using either the classic or new
streams group protocol.
Note that this PR will deliver IQ information in each heartbeat
response. A follow-up PR will change that to be only sending IQ
information when assignments change.
Reviewers Lucas Brutschy <[email protected]>
---
.../StreamsGroupHeartbeatRequestManager.java | 22 +-
.../consumer/internals/StreamsRebalanceData.java | 32 ++-
.../message/StreamsGroupHeartbeatResponse.json | 6 +-
.../StreamsGroupHeartbeatRequestManagerTest.java | 8 +-
.../coordinator/group/GroupMetadataManager.java | 23 +-
.../topics/EndpointToPartitionsManager.java | 86 +++++++
.../topics/EndpointToPartitionsManagerTest.java | 162 +++++++++++++
.../IQv2EndpointToPartitionsIntegrationTest.java | 270 +++++++++++++++++++++
.../streams/processor/internals/StreamThread.java | 15 ++
.../processor/internals/StreamThreadTest.java | 14 +-
.../org/apache/kafka/streams/utils/TestUtils.java | 2 +-
11 files changed, 620 insertions(+), 20 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index b86c6d0498c..8f7d08e23a5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -674,16 +674,24 @@ public class StreamsGroupHeartbeatRequestManager
implements RequestManager {
membershipManager.transitionToFatal();
}
- private static Map<StreamsRebalanceData.HostInfo, List<TopicPartition>>
convertHostInfoMap(final StreamsGroupHeartbeatResponseData data) {
- Map<StreamsRebalanceData.HostInfo, List<TopicPartition>>
partitionsByHost = new HashMap<>();
+ private static Map<StreamsRebalanceData.HostInfo,
StreamsRebalanceData.EndpointPartitions> convertHostInfoMap(
+ final StreamsGroupHeartbeatResponseData data) {
+ Map<StreamsRebalanceData.HostInfo,
StreamsRebalanceData.EndpointPartitions> partitionsByHost = new HashMap<>();
data.partitionsByUserEndpoint().forEach(endpoint -> {
- List<TopicPartition> topicPartitions =
endpoint.partitions().stream()
- .flatMap(partition ->
- partition.partitions().stream().map(partitionId -> new
TopicPartition(partition.topic(), partitionId)))
- .collect(Collectors.toList());
+ List<TopicPartition> activeTopicPartitions =
getTopicPartitionList(endpoint.activePartitions());
+ List<TopicPartition> standbyTopicPartitions =
getTopicPartitionList(endpoint.standbyPartitions());
StreamsGroupHeartbeatResponseData.Endpoint userEndpoint =
endpoint.userEndpoint();
- partitionsByHost.put(new
StreamsRebalanceData.HostInfo(userEndpoint.host(), userEndpoint.port()),
topicPartitions);
+ StreamsRebalanceData.EndpointPartitions endpointPartitions = new
StreamsRebalanceData.EndpointPartitions(activeTopicPartitions,
standbyTopicPartitions);
+ partitionsByHost.put(new
StreamsRebalanceData.HostInfo(userEndpoint.host(), userEndpoint.port()),
endpointPartitions);
});
return partitionsByHost;
}
+
+ static List<TopicPartition>
getTopicPartitionList(List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitions) {
+ return topicPartitions.stream()
+ .flatMap(partition ->
+ partition.partitions().stream().map(partitionId -> new
TopicPartition(partition.topic(), partitionId)))
+ .collect(Collectors.toList());
+ }
+
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index 0158370a509..2fe7ae8ad35 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -119,6 +120,31 @@ public class StreamsRebalanceData {
}
}
+ public static class EndpointPartitions {
+ private final List<TopicPartition> activePartitions;
+ private final List<TopicPartition> standbyPartitions;
+
+ public EndpointPartitions(final List<TopicPartition> activePartitions,
+ final List<TopicPartition>
standbyPartitions) {
+ this.activePartitions = activePartitions;
+ this.standbyPartitions = standbyPartitions;
+ }
+
+ public List<TopicPartition> activePartitions() {
+ return new ArrayList<>(activePartitions);
+ }
+
+ public List<TopicPartition> standbyPartitions() {
+ return new ArrayList<>(standbyPartitions);
+ }
+ @Override
+ public String toString() {
+ return "EndpointPartitions {"
+ + "activePartitions=" + activePartitions
+ + ", standbyPartitions=" + standbyPartitions
+ + '}';
+ }
+ }
public static class Assignment {
@@ -297,7 +323,7 @@ public class StreamsRebalanceData {
private final AtomicReference<Assignment> reconciledAssignment = new
AtomicReference<>(Assignment.EMPTY);
- private final AtomicReference<Map<HostInfo, List<TopicPartition>>>
partitionsByHost = new AtomicReference<>(Collections.emptyMap());
+ private final AtomicReference<Map<HostInfo, EndpointPartitions>>
partitionsByHost = new AtomicReference<>(Collections.emptyMap());
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
@@ -341,11 +367,11 @@ public class StreamsRebalanceData {
return reconciledAssignment.get();
}
- public void setPartitionsByHost(final Map<HostInfo, List<TopicPartition>>
partitionsByHost) {
+ public void setPartitionsByHost(final Map<HostInfo, EndpointPartitions>
partitionsByHost) {
this.partitionsByHost.set(partitionsByHost);
}
- public Map<HostInfo, List<TopicPartition>> partitionsByHost() {
+ public Map<HostInfo, EndpointPartitions> partitionsByHost() {
return partitionsByHost.get();
}
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index a5f3a99f9de..7127fcd1282 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -70,8 +70,10 @@
"fields": [
{ "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
"about": "User-defined endpoint to connect to the node" },
- { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
- "about": "All partitions available on the node" }
+ { "name": "ActivePartitions", "type": "[]TopicPartition", "versions":
"0+",
+ "about": "All topic partitions materialized by active tasks on the
node" },
+ { "name": "StandbyPartitions", "type": "[]TopicPartition", "versions":
"0+",
+ "about": "All topic partitions materialized by standby tasks on the
node" }
]
}
],
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 5e0c6652ef6..9839f3b2210 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -153,7 +153,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
List.of(
new StreamsGroupHeartbeatResponseData.EndpointToPartitions()
.setUserEndpoint(new
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(8080))
- .setPartitions(List.of(
+ .setActivePartitions(List.of(
new
StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("topic").setPartitions(List.of(0)))
)
);
@@ -591,9 +591,9 @@ class StreamsGroupHeartbeatRequestManagerTest {
.get(new StreamsRebalanceData.HostInfo(
ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
- );
-
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(),
topicPartitions.get(0).topic());
-
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
topicPartitions.get(0).partition());
+ ).activePartitions();
+
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).activePartitions().get(0).topic(),
topicPartitions.get(0).topic());
+
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).activePartitions().get(0).partitions().get(0),
topicPartitions.get(0).partition());
assertEquals(
1.0,
metrics.metric(metrics.metricName("heartbeat-total",
"consumer-coordinator-metrics")).metricValue()
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ad4a89d90eb..fb2c7c4c6af 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -158,6 +158,7 @@ import
org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import
org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
import org.apache.kafka.image.MetadataDelta;
@@ -1982,7 +1983,8 @@ public class GroupMetadataManager {
StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
- .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+ .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
+
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
// The assignment is only provided in the following cases:
// 1. The member is joining.
@@ -2093,6 +2095,25 @@ public class GroupMetadataManager {
.collect(Collectors.toList());
}
+ private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
maybeBuildEndpointToPartitions(StreamsGroup group) {
+ List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList = new ArrayList<>();
+ final Map<String, StreamsGroupMember> members = group.members();
+ for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet())
{
+ final String memberIdForAssignment = entry.getKey();
+ final Optional<StreamsGroupMemberMetadataValue.Endpoint>
endpointOptional = members.get(memberIdForAssignment).userEndpoint();
+ StreamsGroupMember groupMember = entry.getValue();
+ if (endpointOptional.isPresent()) {
+ final StreamsGroupMemberMetadataValue.Endpoint endpoint =
endpointOptional.get();
+ final StreamsGroupHeartbeatResponseData.Endpoint
responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
+ responseEndpoint.setHost(endpoint.host());
+ responseEndpoint.setPort(endpoint.port());
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint,
group);
+ endpointToPartitionsList.add(endpointToPartitions);
+ }
+ }
+ return endpointToPartitionsList.isEmpty() ? null :
endpointToPartitionsList;
+ }
+
/**
* Handles a regular heartbeat from a consumer group member. It mainly
consists of
* three parts:
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
new file mode 100644
index 00000000000..ea3eca20935
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EndpointToPartitionsManager {
+
+ private EndpointToPartitionsManager() {
+ }
+
+ public static StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions(final StreamsGroupMember streamsGroupMember,
+
final StreamsGroupHeartbeatResponseData.Endpoint
responseEndpoint,
+
final StreamsGroup streamsGroup) {
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions = new
StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+ Map<String, Set<Integer>> activeTasks =
streamsGroupMember.assignedTasks().activeTasks();
+ Map<String, Set<Integer>> standbyTasks =
streamsGroupMember.assignedTasks().standbyTasks();
+ endpointToPartitions.setUserEndpoint(responseEndpoint);
+ Map<String, ConfiguredSubtopology> configuredSubtopologies =
streamsGroup.configuredTopology().flatMap(ConfiguredTopology::subtopologies).get();
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
activeTopicPartitions = topicPartitions(activeTasks, configuredSubtopologies,
streamsGroup.partitionMetadata());
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
standbyTopicPartitions = topicPartitions(standbyTasks, configuredSubtopologies,
streamsGroup.partitionMetadata());
+ endpointToPartitions.setActivePartitions(activeTopicPartitions);
+ endpointToPartitions.setStandbyPartitions(standbyTopicPartitions);
+ return endpointToPartitions;
+ }
+
+ private static List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitions(final Map<String, Set<Integer>> tasks,
+
final Map<String, ConfiguredSubtopology> configuredSubtopologies,
+
final Map<String, TopicMetadata> groupTopicMetadata) {
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitionsForTasks = new ArrayList<>();
+ for (Map.Entry<String, Set<Integer>> taskEntry : tasks.entrySet()) {
+ String subtopologyId = taskEntry.getKey();
+ ConfiguredSubtopology configuredSubtopology =
configuredSubtopologies.get(subtopologyId);
+ Set<String> sourceTopics = configuredSubtopology.sourceTopics();
+ Set<String> repartitionSourceTopics =
configuredSubtopology.repartitionSourceTopics().keySet();
+ Set<String> allSourceTopic = new HashSet<>(sourceTopics);
+ allSourceTopic.addAll(repartitionSourceTopics);
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitionList = topicPartitionListForTask(taskEntry.getValue(),
allSourceTopic, groupTopicMetadata);
+ topicPartitionsForTasks.addAll(topicPartitionList);
+ }
+ return topicPartitionsForTasks;
+ }
+
+ private static List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitionListForTask(final Set<Integer> taskSet,
+
final Set<String> topicNames,
+
final Map<String, TopicMetadata> groupTopicMetadata) {
+ return topicNames.stream().map(topic -> {
+ int numPartitionsForTopic =
groupTopicMetadata.get(topic).numPartitions();
+ StreamsGroupHeartbeatResponseData.TopicPartition tp = new
StreamsGroupHeartbeatResponseData.TopicPartition();
+ tp.setTopic(topic);
+ List<Integer> tpPartitions = new ArrayList<>(taskSet);
+ if (numPartitionsForTopic < taskSet.size()) {
+ Collections.sort(tpPartitions);
+ tp.setPartitions(tpPartitions.subList(0,
numPartitionsForTopic));
+ } else {
+ tp.setPartitions(tpPartitions);
+ }
+ return tp;
+ }).toList();
+ }
+}
\ No newline at end of file
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
new file mode 100644
index 00000000000..2002774b60b
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.TasksTuple;
+import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class EndpointToPartitionsManagerTest {
+
+ private StreamsGroup streamsGroup;
+ private StreamsGroupMember streamsGroupMember;
+ private ConfiguredTopology configuredTopology;
+ private ConfiguredSubtopology configuredSubtopologyOne;
+ private ConfiguredSubtopology configuredSubtopologyTwo;
+ private final Map<String, Set<Integer>> activeTasks = new HashMap<>();
+ private final Map<String, Set<Integer>> standbyTasks = new HashMap<>();
+ private TasksTuple tasksTuple;
+ private final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint
= new StreamsGroupHeartbeatResponseData.Endpoint();
+
+ @BeforeEach
+ public void setUp() {
+ streamsGroup = mock(StreamsGroup.class);
+ streamsGroupMember = mock(StreamsGroupMember.class);
+ configuredTopology = mock(ConfiguredTopology.class);
+ configuredSubtopologyOne = new
ConfiguredSubtopology(Set.of("Topic-A"), new HashMap<>(), new HashSet<>(), new
HashMap<>());
+ Map<String, ConfiguredInternalTopic> repartitionSourceTopics =
Map.of("Topic-B", new ConfiguredInternalTopic("Topic-B", 1,
Optional.of((short) 1), Collections.emptyMap()));
+ configuredSubtopologyTwo = new ConfiguredSubtopology(new HashSet<>(),
repartitionSourceTopics, new HashSet<>(), new HashMap<>());
+ SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap =
new TreeMap<>();
+ configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
+ SortedMap<String, ConfiguredSubtopology> configuredSubtopologyTwoMap =
new TreeMap<>();
+ configuredSubtopologyOneMap.put("1", configuredSubtopologyTwo);
+
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap));
+
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyTwoMap));
+ responseEndpoint.setHost("localhost");
+ responseEndpoint.setPort(9092);
+ }
+
+ @Test
+ void testEndpointToPartitionsWithStandbyTaskAssignments() {
+ Map<String, TopicMetadata> topicMetadata = new HashMap<>();
+ topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(),
"Topic-A", 3));
+ topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(),
"Topic-B", 3));
+
+ activeTasks.put("0", Set.of(0, 1, 2));
+ standbyTasks.put("1", Set.of(0, 1, 2));
+ tasksTuple = new TasksTuple(activeTasks, standbyTasks,
Collections.emptyMap());
+ when(streamsGroupMember.assignedTasks()).thenReturn(tasksTuple);
+
//when(streamsGroupMember.assignedTasks().standbyTasks()).thenReturn(tasksTuple.standbyTasks());
+ when((streamsGroup.partitionMetadata())).thenReturn(topicMetadata);
+
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
+ SortedMap<String, ConfiguredSubtopology> configuredSubtopologyMap =
new TreeMap<>();
+ configuredSubtopologyMap.put("0", configuredSubtopologyOne);
+ configuredSubtopologyMap.put("1", configuredSubtopologyTwo);
+
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyMap));
+
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions result =
+
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember,
responseEndpoint, streamsGroup);
+
+ assertEquals(responseEndpoint, result.userEndpoint());
+ assertEquals(1, result.activePartitions().size());
+ assertEquals(1, result.standbyPartitions().size());
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
activePartitions = result.activePartitions();
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
standbyPartitions = result.standbyPartitions();
+
activePartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic));
+
standbyPartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic));
+ assertTopicPartitionsAssigned(activePartitions, "Topic-A");
+ assertTopicPartitionsAssigned(standbyPartitions, "Topic-B");
+ }
+
+ private static void
assertTopicPartitionsAssigned(List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitions, String topicName) {
+ StreamsGroupHeartbeatResponseData.TopicPartition topicPartition =
topicPartitions.stream().filter(tp ->
tp.topic().equals(topicName)).findFirst().get();
+ assertEquals(topicName, topicPartition.topic());
+ assertEquals(List.of(0, 1, 2),
topicPartition.partitions().stream().sorted().toList());
+ }
+
+ @ParameterizedTest(name = "{4}")
+ @MethodSource("argsProvider")
+ void testEndpointToPartitionsWithTwoTopicsAndDifferentPartitions(int
topicAPartitions,
+ int
topicBPartitions,
+
List<Integer> topicAExpectedPartitions,
+
List<Integer> topicBExpectedPartitions,
+ String
testName
+ ) {
+ Map<String, TopicMetadata> topicMetadata = new HashMap<>();
+ topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(),
"Topic-A", topicAPartitions));
+ topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(),
"Topic-B", topicBPartitions));
+ configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A",
"Topic-B"), new HashMap<>(), new HashSet<>(), new HashMap<>());
+
+ activeTasks.put("0", Set.of(0, 1, 2, 3, 4));
+ when(streamsGroupMember.assignedTasks()).thenReturn(new
TasksTuple(activeTasks, Collections.emptyMap(), Collections.emptyMap()));
+ when(streamsGroup.partitionMetadata()).thenReturn(topicMetadata);
+
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
+ SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap =
new TreeMap<>();
+ configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
+
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap));
+
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions result =
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember,
responseEndpoint, streamsGroup);
+
+ assertEquals(responseEndpoint, result.userEndpoint());
+ assertEquals(2, result.activePartitions().size());
+
+ List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions
= result.activePartitions();
+
topicPartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic));
+
+ StreamsGroupHeartbeatResponseData.TopicPartition topicAPartition =
result.activePartitions().get(0);
+ assertEquals("Topic-A", topicAPartition.topic());
+ assertEquals(topicAExpectedPartitions,
topicAPartition.partitions().stream().sorted().toList());
+
+ StreamsGroupHeartbeatResponseData.TopicPartition topicBPartition =
result.activePartitions().get(1);
+ assertEquals("Topic-B", topicBPartition.topic());
+ assertEquals(topicBExpectedPartitions,
topicBPartition.partitions().stream().sorted().toList());
+ }
+
+ static Stream<Arguments> argsProvider() {
+ return Stream.of(
+ arguments(2, 5, List.of(0, 1), List.of(0, 1, 2, 3, 4), "Should
assign correct partitions when partitions differ between topics"),
+ arguments(3, 3, List.of(0, 1, 2), List.of(0, 1, 2), "Should
assign correct partitions when partitions same between topics")
+ );
+ }
+}
\ No newline at end of file
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2EndpointToPartitionsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2EndpointToPartitionsIntegrationTest.java
new file mode 100644
index 00000000000..7cad5998701
--- /dev/null
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2EndpointToPartitionsIntegrationTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+@Timeout(600)
+@Tag("integration")
+public class IQv2EndpointToPartitionsIntegrationTest {
+ private String appId;
+ private String inputTopicTwoPartitions;
+ private String outputTopicTwoPartitions;
+ private Properties streamsApplicationProperties = new Properties();
+ private Properties streamsSecondApplicationProperties = new Properties();
+
+ private static EmbeddedKafkaCluster cluster;
+ private static final int NUM_BROKERS = 3;
+ private static final String EXPECTED_STORE_NAME = "IQTest-count";
+
+ public void startCluster(final int standbyConfig) throws IOException {
+ final Properties properties = new Properties();
+
properties.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
standbyConfig);
+ cluster =
EmbeddedKafkaCluster.withStreamsRebalanceProtocol(NUM_BROKERS, properties);
+ cluster.start();
+ }
+
+ public void setUp() throws InterruptedException {
+ appId = safeUniqueTestName("endpointIntegrationTest");
+ inputTopicTwoPartitions = appId + "-input-two";
+ outputTopicTwoPartitions = appId + "-output-two";
+ cluster.createTopic(inputTopicTwoPartitions, 2, 1);
+ cluster.createTopic(outputTopicTwoPartitions, 2, 1);
+ }
+
+ public void closeCluster() {
+ cluster.stop();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+
IntegrationTestUtils.purgeLocalStreamsState(streamsApplicationProperties);
+ if (!streamsSecondApplicationProperties.isEmpty()) {
+
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
+ }
+ }
+
+ @ParameterizedTest(name = "{3}")
+ @MethodSource("groupProtocolParameters")
+ public void shouldGetCorrectHostPartitionInformation(final String
groupProtocolConfig,
+ final boolean
usingStandbyReplicas,
+ final int
numStandbyReplicas,
+ final String
testName) throws Exception {
+ try {
+ startCluster(usingStandbyReplicas ? numStandbyReplicas : 0);
+ setUp();
+
+ final Properties streamOneProperties = new Properties();
+ streamOneProperties.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(appId).getPath() + "-ks1");
+ streamOneProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId +
"-ks1");
+ streamOneProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
"localhost:2020");
+ streamOneProperties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
groupProtocolConfig);
+ if (usingStandbyReplicas) {
+
streamOneProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
numStandbyReplicas);
+ }
+ streamsApplicationProperties = props(streamOneProperties);
+
+ final Properties streamTwoProperties = new Properties();
+ streamTwoProperties.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(appId).getPath() + "-ks2");
+ streamTwoProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId +
"-ks2");
+ streamTwoProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
"localhost:3030");
+ streamTwoProperties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
groupProtocolConfig);
+ if (usingStandbyReplicas) {
+
streamTwoProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
numStandbyReplicas);
+ }
+ streamsSecondApplicationProperties = props(streamTwoProperties);
+
+ final Topology topology = complexTopology();
+ try (final KafkaStreams streamsOne = new KafkaStreams(topology,
streamsApplicationProperties)) {
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
+ waitForCondition(() ->
!streamsOne.metadataForAllStreamsClients().isEmpty(),
+ IntegrationTestUtils.DEFAULT_TIMEOUT,
+ () -> "Kafka Streams didn't get metadata about the
client.");
+ waitForCondition(() ->
streamsOne.metadataForAllStreamsClients().iterator().next().topicPartitions().size()
== 4,
+ IntegrationTestUtils.DEFAULT_TIMEOUT,
+ () -> "Kafka Streams one didn't get 4 tasks");
+ final List<StreamsMetadata> streamsMetadataAllClients = new
ArrayList<>(streamsOne.metadataForAllStreamsClients());
+ assertEquals(1, streamsMetadataAllClients.size());
+ final StreamsMetadata streamsOneInitialMetadata =
streamsMetadataAllClients.get(0);
+ assertEquals(2020,
streamsOneInitialMetadata.hostInfo().port());
+ final Set<TopicPartition> topicPartitions =
streamsOneInitialMetadata.topicPartitions();
+ assertEquals(4, topicPartitions.size());
+ assertEquals(0,
streamsOneInitialMetadata.standbyTopicPartitions().size());
+
+ final long repartitionTopicTaskCount =
topicPartitions.stream().filter(tp ->
tp.topic().contains("-repartition")).count();
+ final long sourceTopicTaskCount =
topicPartitions.stream().filter(tp ->
tp.topic().contains("-input-two")).count();
+ assertEquals(2, repartitionTopicTaskCount);
+ assertEquals(2, sourceTopicTaskCount);
+ final int expectedStandbyCount = usingStandbyReplicas ? 1 : 0;
+
+ try (final KafkaStreams streamsTwo = new
KafkaStreams(topology, streamsSecondApplicationProperties)) {
+ streamsTwo.start();
+ waitForCondition(() -> KafkaStreams.State.RUNNING ==
streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(),
+ IntegrationTestUtils.DEFAULT_TIMEOUT,
+ () -> "Kafka Streams one or two never transitioned
to a RUNNING state.");
+
+ waitForCondition(() -> {
+ final ThreadMetadata threadMetadata =
streamsOne.metadataForLocalThreads().iterator().next();
+ return threadMetadata.activeTasks().size() == 2 &&
threadMetadata.standbyTasks().size() == expectedStandbyCount;
+ }, TestUtils.DEFAULT_MAX_WAIT_MS,
+ "KafkaStreams one never released active tasks and
received standby task");
+
+ waitForCondition(() -> {
+ final ThreadMetadata threadMetadata =
streamsTwo.metadataForLocalThreads().iterator().next();
+ return threadMetadata.activeTasks().size() == 2 &&
threadMetadata.standbyTasks().size() == expectedStandbyCount;
+ }, TestUtils.DEFAULT_MAX_WAIT_MS,
+ "KafkaStreams two never received active tasks and
standby");
+
+ waitForCondition(() -> {
+ final List<StreamsMetadata> metadata = new
ArrayList<>(streamsTwo.metadataForAllStreamsClients());
+ return metadata.size() == 2 &&
+ metadata.get(0).standbyTopicPartitions().size()
== expectedStandbyCount &&
+ metadata.get(1).standbyTopicPartitions().size()
== expectedStandbyCount;
+ }, TestUtils.DEFAULT_MAX_WAIT_MS,
+ "Kafka Streams clients 1 and 2 never got metadata
about standby tasks");
+
+ waitForCondition(() ->
streamsOne.metadataForAllStreamsClients().iterator().next().topicPartitions().size()
== 2,
+ IntegrationTestUtils.DEFAULT_TIMEOUT,
+ () -> "Kafka Streams one didn't give up active
tasks");
+
+ final List<StreamsMetadata> allClientMetadataUpdated = new
ArrayList<>(streamsTwo.metadataForAllStreamsClients());
+
+ final StreamsMetadata streamsOneMetadata =
allClientMetadataUpdated.get(0);
+ final Set<TopicPartition> streamsOneActiveTopicPartitions
= streamsOneMetadata.topicPartitions();
+ final Set<TopicPartition> streamsOneStandbyTopicPartitions
= streamsOneMetadata.standbyTopicPartitions();
+ final Set<String> streamsOneStoreNames =
streamsOneMetadata.stateStoreNames();
+ final Set<String> streamsOneStandbyStoreNames =
streamsOneMetadata.standbyStateStoreNames();
+
+ assertEquals(2020, streamsOneMetadata.hostInfo().port());
+ assertEquals(2, streamsOneActiveTopicPartitions.size());
+ assertEquals(expectedStandbyCount,
streamsOneStandbyTopicPartitions.size());
+ assertEquals(1, streamsOneStoreNames.size());
+ assertEquals(expectedStandbyCount,
streamsOneStandbyStoreNames.size());
+ assertEquals(EXPECTED_STORE_NAME,
streamsOneStoreNames.iterator().next());
+ if (usingStandbyReplicas) {
+ assertEquals(EXPECTED_STORE_NAME,
streamsOneStandbyStoreNames.iterator().next());
+ }
+
+ final long streamsOneRepartitionTopicCount =
streamsOneActiveTopicPartitions.stream().filter(tp ->
tp.topic().contains("-repartition")).count();
+ final long streamsOneSourceTopicCount =
streamsOneActiveTopicPartitions.stream().filter(tp ->
tp.topic().contains("-input-two")).count();
+ assertEquals(1, streamsOneRepartitionTopicCount);
+ assertEquals(1, streamsOneSourceTopicCount);
+
+ final StreamsMetadata streamsTwoMetadata =
allClientMetadataUpdated.get(1);
+ final Set<TopicPartition> streamsTwoActiveTopicPartitions
= streamsTwoMetadata.topicPartitions();
+ final Set<TopicPartition> streamsTwoStandbyTopicPartitions
= streamsTwoMetadata.standbyTopicPartitions();
+ final Set<String> streamsTwoStateStoreNames =
streamsTwoMetadata.stateStoreNames();
+ final Set<String> streamsTwoStandbyStateStoreNames =
streamsTwoMetadata.standbyStateStoreNames();
+
+ assertEquals(3030, streamsTwoMetadata.hostInfo().port());
+ assertEquals(2, streamsTwoActiveTopicPartitions.size());
+ assertEquals(expectedStandbyCount,
streamsTwoStandbyTopicPartitions.size());
+ assertEquals(1, streamsTwoStateStoreNames.size());
+ assertEquals(expectedStandbyCount,
streamsTwoStandbyStateStoreNames.size());
+ assertEquals(EXPECTED_STORE_NAME,
streamsTwoStateStoreNames.iterator().next());
+ if (usingStandbyReplicas) {
+ assertEquals(EXPECTED_STORE_NAME,
streamsTwoStandbyStateStoreNames.iterator().next());
+ }
+
+ final long streamsTwoRepartitionTopicCount =
streamsTwoActiveTopicPartitions.stream().filter(tp ->
tp.topic().contains("-repartition")).count();
+ final long streamsTwoSourceTopicCount =
streamsTwoActiveTopicPartitions.stream().filter(tp ->
tp.topic().contains("-input-two")).count();
+ assertEquals(1, streamsTwoRepartitionTopicCount);
+ assertEquals(1, streamsTwoSourceTopicCount);
+
+ if (usingStandbyReplicas) {
+ final TopicPartition streamsOneStandbyTopicPartition =
streamsOneStandbyTopicPartitions.iterator().next();
+ final TopicPartition streamsTwoStandbyTopicPartition =
streamsTwoStandbyTopicPartitions.iterator().next();
+ final String streamsOneStandbyTopicName =
streamsOneStandbyTopicPartition.topic();
+ final String streamsTwoStandbyTopicName =
streamsTwoStandbyTopicPartition.topic();
+ assertEquals(streamsOneStandbyTopicName,
streamsTwoStandbyTopicName);
+
assertNotEquals(streamsOneStandbyTopicPartition.partition(),
streamsTwoStandbyTopicPartition.partition());
+ }
+ }
+ }
+ } finally {
+ closeCluster();
+ }
+ }
+
+ private static Stream<Arguments> groupProtocolParameters() {
+ return Stream.of(Arguments.of("streams", false, 0, "STREAMS protocol
No standby"),
+ Arguments.of("classic", false, 0, "CLASSIC protocol No
standby"),
+ Arguments.of("streams", true, 1, "STREAMS protocol With
standby"),
+ Arguments.of("classic", true, 1, "CLASSIC protocol With
standby"));
+ }
+
+ private Properties props(final Properties extraProperties) {
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers());
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(appId).getPath());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ streamsConfiguration.putAll(extraProperties);
+ return streamsConfiguration;
+ }
+
+ private Topology complexTopology() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream(inputTopicTwoPartitions, Consumed.with(Serdes.String(),
Serdes.String()))
+ .flatMapValues(value ->
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+ .groupBy((key, value) -> value, Grouped.as("IQTest"))
+ .count(Materialized.as(EXPECTED_STORE_NAME))
+ .toStream().to(outputTopicTwoPartitions,
Produced.with(Serdes.String(), Serdes.Long()));
+ return builder.build();
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d5f67ff4fad..dcfca3ec2f5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1494,6 +1494,21 @@ public class StreamThread extends Thread implements
ProcessingThread {
shutdownErrorHook.run();
}
}
+
+ final Map<StreamsRebalanceData.HostInfo,
StreamsRebalanceData.EndpointPartitions> partitionsByEndpoint =
+ streamsRebalanceData.get().partitionsByHost();
+ final Map<HostInfo, Set<TopicPartition>> activeHostInfoMap = new
HashMap<>();
+ final Map<HostInfo, Set<TopicPartition>> standbyHostInfoMap = new
HashMap<>();
+
+ partitionsByEndpoint.forEach((hostInfo, endpointPartitions) -> {
+ activeHostInfoMap.put(new HostInfo(hostInfo.host(),
hostInfo.port()), new HashSet<>(endpointPartitions.activePartitions()));
+ standbyHostInfoMap.put(new HostInfo(hostInfo.host(),
hostInfo.port()), new HashSet<>(endpointPartitions.standbyPartitions()));
+ });
+ streamsMetadataState.onChange(
+ activeHostInfoMap,
+ standbyHostInfoMap,
+ getTopicPartitionInfo(activeHostInfoMap)
+ );
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8cb2fc8cdfe..96090aa32fa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -3805,6 +3805,11 @@ public class StreamThreadTest {
final Runnable shutdownErrorHook = mock(Runnable.class);
final Properties props = configProps(false, false, false);
+ final StreamsMetadataState streamsMetadataState = new
StreamsMetadataState(
+ new TopologyMetadata(internalTopologyBuilder, new
StreamsConfig(props)),
+ StreamsMetadataState.UNKNOWN_HOST,
+ new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
+ );
final StreamsConfig config = new StreamsConfig(props);
thread = new StreamThread(
new MockTime(1),
@@ -3828,7 +3833,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.of(streamsRebalanceData),
- null
+ streamsMetadataState
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
@@ -3860,6 +3865,11 @@ public class StreamThreadTest {
final Properties props = configProps(false, false, false);
final Runnable shutdownErrorHook = mock(Runnable.class);
final StreamsConfig config = new StreamsConfig(props);
+ final StreamsMetadataState streamsMetadataState = new
StreamsMetadataState(
+ new TopologyMetadata(internalTopologyBuilder, config),
+ StreamsMetadataState.UNKNOWN_HOST,
+ new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
+ );
thread = new StreamThread(
new MockTime(1),
config,
@@ -3882,7 +3892,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.of(streamsRebalanceData),
- null
+ streamsMetadataState
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
index 96d19dbb47e..2e13e3e6d9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
@@ -90,7 +90,7 @@ public class TestUtils {
return safeUniqueTestName(methodName);
}
- private static String safeUniqueTestName(final String testName) {
+ public static String safeUniqueTestName(final String testName) {
return sanitize(testName + Uuid.randomUuid().toString());
}