This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 2d096d8c427 MINOR: Cache endpoint-to-partitions computation in streams
group heartbeat (#21526)
2d096d8c427 is described below
commit 2d096d8c42761338251186cfac2ceba726b96917
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Feb 25 09:48:46 2026 +0100
MINOR: Cache endpoint-to-partitions computation in streams group heartbeat
(#21526)
When the endpoint information epoch known to a member is outdated, the
broker recomputes the endpoint-to-partitions mapping for all members
from scratch. This is expensive because it happens for each member
individually as they catch up to the new epoch.
This commit adds a per-member cache for endpoint-to-partitions results
in StreamsGroup, keyed by member ID. Cache entries are explicitly
invalidated when a member changes, when a member is removed, or when
the configured topology changes.
The computation and endpoint building logic is moved from
GroupMetadataManager into StreamsGroup. This keeps the cache management
colocated with the member lifecycle methods that need to invalidate it
(updateMember, removeMember, setConfiguredTopology), and also avoids the
overhead of the Collections.unmodifiableMap wrapper returned by the
public members() accessor.
Tests cover cache population, retrieval, and invalidation on task
change, member removal, topology change, and preservation when tasks are
unchanged.
Reviewers: Matthias J. Sax <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 24 +-----
.../coordinator/group/streams/StreamsGroup.java | 91 +++++++++++++++++++++
.../group/streams/StreamsGroupTest.java | 95 ++++++++++++++++++++++
3 files changed, 188 insertions(+), 22 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 2e0651c199f..23a8f02b3ea 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -161,7 +161,6 @@ import
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
-import
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import
org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
import org.apache.kafka.server.authorizer.Action;
@@ -2135,6 +2134,7 @@ public class GroupMetadataManager {
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(updatedMember.assignedTasks().activeTasksWithEpochs()));
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
+
group.invalidateCachedEndpointToPartitions(updatedMember.memberId());
if (updatedMember.userEndpoint().isPresent()) {
// If no user endpoint is defined, there is no change in the
endpoint information.
// Otherwise, bump the endpoint information epoch
@@ -2143,7 +2143,7 @@ public class GroupMetadataManager {
}
if (group.endpointInformationEpoch() != memberEndpointEpoch) {
-
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group,
updatedMember));
+
response.setPartitionsByUserEndpoint(group.buildEndpointToPartitions(updatedMember,
metadataImage));
}
if (groups.containsKey(group.groupId())) {
// If we just created the group, the endpoint information epoch
will not be persisted, so return epoch 0.
@@ -2260,26 +2260,6 @@ public class GroupMetadataManager {
.collect(Collectors.toList());
}
- private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
maybeBuildEndpointToPartitions(StreamsGroup group,
-
StreamsGroupMember updatedMember) {
- List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList = new ArrayList<>();
- final Map<String, StreamsGroupMember> members = group.members();
- // Build endpoint information for all members except the updated member
- for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet())
{
- if (updatedMember != null &&
entry.getKey().equals(updatedMember.memberId())) {
- continue;
- }
-
EndpointToPartitionsManager.maybeEndpointToPartitions(entry.getValue(), group,
metadataImage)
- .ifPresent(endpointToPartitionsList::add);
- }
- // Always build endpoint information for the updated member (whether
new or existing)
- if (updatedMember != null) {
-
EndpointToPartitionsManager.maybeEndpointToPartitions(updatedMember, group,
metadataImage)
- .ifPresent(endpointToPartitionsList::add);
- }
- return endpointToPartitionsList;
- }
-
/**
* Handles a regular heartbeat from a consumer group member. It mainly
consists of
* three parts:
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 621a8f9d1bd..69e81b355af 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -22,6 +22,7 @@ import
org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.utils.LogContext;
@@ -35,6 +36,7 @@ import
org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;
@@ -43,6 +45,7 @@ import org.apache.kafka.timeline.TimelineObject;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -216,6 +219,13 @@ public class StreamsGroup implements Group {
* This is used to determine when assignment configuration changes should
trigger a rebalance.
*/
private TimelineHashMap<String, String> lastAssignmentConfigs;
+
+ /**
+ * Cache for endpoint-to-partitions mappings, keyed by member ID.
+ * Entries are explicitly invalidated when a member's tasks change, the
member is removed,
+ * or the topology changes.
+ */
+ private final Map<String,
StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsCache = new HashMap<>();
public StreamsGroup(
LogContext logContext,
@@ -290,6 +300,8 @@ public class StreamsGroup implements Group {
public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
this.configuredTopology.set(Optional.ofNullable(configuredTopology));
+ // Clear endpoint cache since subtopology source topics may have
changed
+ endpointToPartitionsCache.clear();
}
/**
@@ -433,6 +445,7 @@ public class StreamsGroup implements Group {
maybeUpdateTaskProcessId(oldMember, newMember);
updateStaticMember(newMember);
maybeUpdateGroupState();
+ endpointToPartitionsCache.remove(newMember.memberId());
}
/**
@@ -456,6 +469,7 @@ public class StreamsGroup implements Group {
maybeRemoveTaskProcessId(oldMember);
removeStaticMember(oldMember);
maybeUpdateGroupState();
+ endpointToPartitionsCache.remove(memberId);
}
/**
@@ -1127,6 +1141,83 @@ public class StreamsGroup implements Group {
this.endpointInformationEpoch = endpointInformationEpoch;
}
+ // Visible for testing
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
cachedEndpointToPartitions(
+ String memberId
+ ) {
+ return Optional.ofNullable(endpointToPartitionsCache.get(memberId));
+ }
+
+ // Visible for testing
+ void cacheEndpointToPartitions(
+ String memberId,
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions
+ ) {
+ endpointToPartitionsCache.put(memberId, endpointToPartitions);
+ }
+
+ /**
+ * Invalidates the cached endpoint-to-partitions entry for the given
member.
+ * This should be called when a member's assigned tasks change during
reconciliation,
+ * before record replay has had a chance to call updateMember().
+ *
+ * @param memberId The member ID whose cache entry should be invalidated.
+ */
+ public void invalidateCachedEndpointToPartitions(String memberId) {
+ endpointToPartitionsCache.remove(memberId);
+ }
+
+ /**
+ * Builds the endpoint-to-partitions list for all members, using the cache
where possible.
+ *
+ * @param updatedMember The member that was just updated (may have a stale
entry in the members map).
+ * @param metadataImage The current metadata image for resolving topic
partitions.
+ * @return The list of endpoint-to-partitions mappings for all members
with endpoints.
+ */
+ public List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
buildEndpointToPartitions(
+ StreamsGroupMember updatedMember,
+ CoordinatorMetadataImage metadataImage
+ ) {
+ List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList = new ArrayList<>();
+ if (updatedMember == null) {
+ log.error("[GroupId {}] updatedMember is unexpectedly null in
buildEndpointToPartitions. " +
+ "This is a bug, please file a JIRA ticket.", groupId);
+ return endpointToPartitionsList;
+ }
+ for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet())
{
+ if (entry.getKey().equals(updatedMember.memberId())) {
+ continue;
+ }
+ getOrComputeEndpointToPartitions(entry.getValue(), metadataImage)
+ .ifPresent(endpointToPartitionsList::add);
+ }
+ getOrComputeEndpointToPartitions(updatedMember, metadataImage)
+ .ifPresent(endpointToPartitionsList::add);
+ return endpointToPartitionsList;
+ }
+
+ private Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
getOrComputeEndpointToPartitions(
+ StreamsGroupMember member,
+ CoordinatorMetadataImage metadataImage
+ ) {
+ if (member.userEndpoint().isEmpty()) {
+ return Optional.empty();
+ }
+
+ String memberId = member.memberId();
+
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions cached =
endpointToPartitionsCache.get(memberId);
+ if (cached != null) {
+ return Optional.of(cached);
+ }
+
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
computed =
+ EndpointToPartitionsManager.maybeEndpointToPartitions(member,
this, metadataImage);
+ computed.ifPresent(endpointToPartitions ->
+ endpointToPartitionsCache.put(memberId, endpointToPartitions));
+ return computed;
+ }
+
/**
* @return The assignment configurations for this streams group.
*/
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index f015441f86f..44dc0d4615e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -23,6 +23,7 @@ import
org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.LogContext;
@@ -1246,4 +1247,98 @@ public class StreamsGroupTest {
verify(timer).cancel("initial-rebalance-timeout-test-group");
}
+
+ // Endpoint-to-partitions cache tests
+
+ @Test
+ public void testGetCachedEndpointToPartitionsReturnsEmptyWhenNoCache() {
+ StreamsGroup streamsGroup = createStreamsGroup("test-group");
+
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
cached =
+ streamsGroup.cachedEndpointToPartitions("member-1");
+
+ assertTrue(cached.isEmpty());
+ }
+
+ @Test
+ public void testCacheEndpointToPartitionsAndRetrieve() {
+ StreamsGroup streamsGroup = createStreamsGroup("test-group");
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
+ new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+ endpointToPartitions.setUserEndpoint(
+ new
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092)
+ );
+
+ streamsGroup.cacheEndpointToPartitions("member-1",
endpointToPartitions);
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
cached =
+ streamsGroup.cachedEndpointToPartitions("member-1");
+
+ assertTrue(cached.isPresent());
+ assertEquals(endpointToPartitions, cached.get());
+ }
+
+ @Test
+ public void testUpdateMemberInvalidatesCache() {
+ StreamsGroup streamsGroup = createStreamsGroup("test-group");
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
+ new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+ // Create initial member
+ StreamsGroupMember member =
StreamsGroupMember.Builder.withDefaults("member-1")
+ .setProcessId("process-1")
+ .build();
+ streamsGroup.updateMember(member);
+
+ // Cache endpoint info
+ streamsGroup.cacheEndpointToPartitions("member-1",
endpointToPartitions);
+
+ // Update member
+ StreamsGroupMember updatedMember =
StreamsGroupMember.Builder.withDefaults("member-1")
+ .setProcessId("process-1")
+ .build();
+ streamsGroup.updateMember(updatedMember);
+
+ // Cache should be invalidated
+
assertTrue(streamsGroup.cachedEndpointToPartitions("member-1").isEmpty());
+ }
+
+ @Test
+ public void testRemoveMemberRemovesCacheEntry() {
+ StreamsGroup streamsGroup = createStreamsGroup("test-group");
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
+ new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+ // Create member
+ StreamsGroupMember member =
StreamsGroupMember.Builder.withDefaults("member-1")
+ .setProcessId("process-1")
+ .build();
+ streamsGroup.updateMember(member);
+
+ // Cache endpoint info
+ streamsGroup.cacheEndpointToPartitions("member-1",
endpointToPartitions);
+
+ // Remove member
+ streamsGroup.removeMember("member-1");
+
+ // Cache should be cleared for that member
+
assertTrue(streamsGroup.cachedEndpointToPartitions("member-1").isEmpty());
+ }
+
+ @Test
+ public void testSetConfiguredTopologyClearsCache() {
+ StreamsGroup streamsGroup = createStreamsGroup("test-group");
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
+ new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+ // Cache some entries
+ streamsGroup.cacheEndpointToPartitions("member-1",
endpointToPartitions);
+ streamsGroup.cacheEndpointToPartitions("member-2",
endpointToPartitions);
+
+ // Set configured topology (even null should clear cache)
+ streamsGroup.setConfiguredTopology(null);
+
+ // All cache entries should be cleared
+
assertTrue(streamsGroup.cachedEndpointToPartitions("member-1").isEmpty());
+
assertTrue(streamsGroup.cachedEndpointToPartitions("member-2").isEmpty());
+ }
}