This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bed5adcec47 MINOR: Cache endpoint-to-partitions computation in streams 
group heartbeat (#21526)
bed5adcec47 is described below

commit bed5adcec474f391e17e5e627cbc33eddf0dd1c5
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Feb 25 09:48:46 2026 +0100

    MINOR: Cache endpoint-to-partitions computation in streams group heartbeat 
(#21526)
    
    When the endpoint information epoch known to a member is outdated, the
    broker recomputes the endpoint-to-partitions mapping for all members
    from scratch. This is expensive because it happens for each member
    individually as they catch up to the new epoch.
    
    This commit adds a per-member cache for endpoint-to-partitions results
    in StreamsGroup, keyed by member ID. Cache entries are explicitly
    invalidated when a member changes, when a member is  removed, or when
    the configured topology changes.
    
    The computation and endpoint building logic is moved from
    GroupMetadataManager into StreamsGroup. This keeps the cache management
    colocated with the member lifecycle methods that need to invalidate it
    (updateMember, removeMember, setConfiguredTopology), and also avoids the
    overhead of the Collections.unmodifiableMap wrapper returned by the
    public members() accessor.
    
    Tests cover cache population, retrieval, and invalidation on task
    change, member removal, topology change, and preservation when tasks are
    unchanged.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 24 +-----
 .../coordinator/group/streams/StreamsGroup.java    | 91 +++++++++++++++++++++
 .../group/streams/StreamsGroupTest.java            | 95 ++++++++++++++++++++++
 3 files changed, 188 insertions(+), 22 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a79c3a6a8d9..e21304178c3 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -161,7 +161,6 @@ import 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
 import 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
-import 
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
 import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
 import 
org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
@@ -2124,6 +2123,7 @@ public class GroupMetadataManager {
             
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(updatedMember.assignedTasks().activeTasksWithEpochs()));
             
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
             
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
+            
group.invalidateCachedEndpointToPartitions(updatedMember.memberId());
             if (updatedMember.userEndpoint().isPresent()) {
                 // If no user endpoint is defined, there is no change in the 
endpoint information.
                 // Otherwise, bump the endpoint information epoch
@@ -2132,7 +2132,7 @@ public class GroupMetadataManager {
         }
 
         if (group.endpointInformationEpoch() != memberEndpointEpoch) {
-            
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group, 
updatedMember));
+            
response.setPartitionsByUserEndpoint(group.buildEndpointToPartitions(updatedMember,
 metadataImage));
         }
         if (groups.containsKey(group.groupId())) {
             // If we just created the group, the endpoint information epoch 
will not be persisted, so return epoch 0.
@@ -2249,26 +2249,6 @@ public class GroupMetadataManager {
             .collect(Collectors.toList());
     }
 
-    private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
maybeBuildEndpointToPartitions(StreamsGroup group,
-                                                                               
                         StreamsGroupMember updatedMember) {
-        List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitionsList = new ArrayList<>();
-        final Map<String, StreamsGroupMember> members = group.members();
-        // Build endpoint information for all members except the updated member
-        for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) 
{
-            if (updatedMember != null && 
entry.getKey().equals(updatedMember.memberId())) {
-                continue;
-            }
-            
EndpointToPartitionsManager.maybeEndpointToPartitions(entry.getValue(), group, 
metadataImage)
-                .ifPresent(endpointToPartitionsList::add);
-        }
-        // Always build endpoint information for the updated member (whether 
new or existing)
-        if (updatedMember != null) {
-            
EndpointToPartitionsManager.maybeEndpointToPartitions(updatedMember, group, 
metadataImage)
-                .ifPresent(endpointToPartitionsList::add);
-        }
-        return endpointToPartitionsList;
-    }
-
     /**
      * Handles a regular heartbeat from a consumer group member. It mainly 
consists of
      * three parts:
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 4f6c092e159..b59081f7ea3 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -22,6 +22,7 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.utils.LogContext;
@@ -35,6 +36,7 @@ import 
org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.coordinator.group.Utils;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import 
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
 import org.apache.kafka.timeline.TimelineInteger;
@@ -43,6 +45,7 @@ import org.apache.kafka.timeline.TimelineObject;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -217,6 +220,13 @@ public class StreamsGroup implements Group {
      */
     private int endpointInformationEpoch = 0;
 
+    /**
+     * Cache for endpoint-to-partitions mappings, keyed by member ID.
+     * Entries are explicitly invalidated when a member's tasks change, the 
member is removed,
+     * or the topology changes.
+     */
+    private final Map<String, 
StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitionsCache = new HashMap<>();
+
     public StreamsGroup(
         LogContext logContext,
         SnapshotRegistry snapshotRegistry,
@@ -290,6 +300,8 @@ public class StreamsGroup implements Group {
 
     public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
         this.configuredTopology.set(Optional.ofNullable(configuredTopology));
+        // Clear endpoint cache since subtopology source topics may have 
changed
+        endpointToPartitionsCache.clear();
     }
 
     /**
@@ -433,6 +445,7 @@ public class StreamsGroup implements Group {
         maybeUpdateTaskProcessId(oldMember, newMember);
         updateStaticMember(newMember);
         maybeUpdateGroupState();
+        endpointToPartitionsCache.remove(newMember.memberId());
     }
 
     /**
@@ -456,6 +469,7 @@ public class StreamsGroup implements Group {
         maybeRemoveTaskProcessId(oldMember);
         removeStaticMember(oldMember);
         maybeUpdateGroupState();
+        endpointToPartitionsCache.remove(memberId);
     }
 
     /**
@@ -1127,6 +1141,83 @@ public class StreamsGroup implements Group {
         this.endpointInformationEpoch = endpointInformationEpoch;
     }
 
+    // Visible for testing
+    Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
cachedEndpointToPartitions(
+        String memberId
+    ) {
+        return Optional.ofNullable(endpointToPartitionsCache.get(memberId));
+    }
+
+    // Visible for testing
+    void cacheEndpointToPartitions(
+        String memberId,
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions
+    ) {
+        endpointToPartitionsCache.put(memberId, endpointToPartitions);
+    }
+
+    /**
+     * Invalidates the cached endpoint-to-partitions entry for the given 
member.
+     * This should be called when a member's assigned tasks change during 
reconciliation,
+     * before record replay has had a chance to call updateMember().
+     *
+     * @param memberId The member ID whose cache entry should be invalidated.
+     */
+    public void invalidateCachedEndpointToPartitions(String memberId) {
+        endpointToPartitionsCache.remove(memberId);
+    }
+
+    /**
+     * Builds the endpoint-to-partitions list for all members, using the cache 
where possible.
+     *
+     * @param updatedMember The member that was just updated (may have a stale 
entry in the members map).
+     * @param metadataImage The current metadata image for resolving topic 
partitions.
+     * @return The list of endpoint-to-partitions mappings for all members 
with endpoints.
+     */
+    public List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
buildEndpointToPartitions(
+        StreamsGroupMember updatedMember,
+        CoordinatorMetadataImage metadataImage
+    ) {
+        List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitionsList = new ArrayList<>();
+        if (updatedMember == null) {
+            log.error("[GroupId {}] updatedMember is unexpectedly null in 
buildEndpointToPartitions. " +
+                "This is a bug, please file a JIRA ticket.", groupId);
+            return endpointToPartitionsList;
+        }
+        for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) 
{
+            if (entry.getKey().equals(updatedMember.memberId())) {
+                continue;
+            }
+            getOrComputeEndpointToPartitions(entry.getValue(), metadataImage)
+                .ifPresent(endpointToPartitionsList::add);
+        }
+        getOrComputeEndpointToPartitions(updatedMember, metadataImage)
+            .ifPresent(endpointToPartitionsList::add);
+        return endpointToPartitionsList;
+    }
+
+    private Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
getOrComputeEndpointToPartitions(
+        StreamsGroupMember member,
+        CoordinatorMetadataImage metadataImage
+    ) {
+        if (member.userEndpoint().isEmpty()) {
+            return Optional.empty();
+        }
+
+        String memberId = member.memberId();
+
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions cached = 
endpointToPartitionsCache.get(memberId);
+        if (cached != null) {
+            return Optional.of(cached);
+        }
+
+        Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
computed =
+            EndpointToPartitionsManager.maybeEndpointToPartitions(member, 
this, metadataImage);
+        computed.ifPresent(endpointToPartitions ->
+            endpointToPartitionsCache.put(memberId, endpointToPartitions));
+        return computed;
+    }
+
     /**
      * @return The assignment configurations for this streams group.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index fa696c0a1ef..3a144d00dda 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.utils.LogContext;
@@ -1247,4 +1248,98 @@ public class StreamsGroupTest {
 
         verify(timer).cancel("initial-rebalance-timeout-test-group");
     }
+
+    // Endpoint-to-partitions cache tests
+
+    @Test
+    public void testGetCachedEndpointToPartitionsReturnsEmptyWhenNoCache() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+
+        Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
cached =
+            streamsGroup.cachedEndpointToPartitions("member-1");
+
+        assertTrue(cached.isEmpty());
+    }
+
+    @Test
+    public void testCacheEndpointToPartitionsAndRetrieve() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+        endpointToPartitions.setUserEndpoint(
+            new 
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092)
+        );
+
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+        Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
cached =
+            streamsGroup.cachedEndpointToPartitions("member-1");
+
+        assertTrue(cached.isPresent());
+        assertEquals(endpointToPartitions, cached.get());
+    }
+
+    @Test
+    public void testUpdateMemberInvalidatesCache() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+        // Create initial member
+        StreamsGroupMember member = 
StreamsGroupMember.Builder.withDefaults("member-1")
+            .setProcessId("process-1")
+            .build();
+        streamsGroup.updateMember(member);
+
+        // Cache endpoint info
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+
+        // Update member
+        StreamsGroupMember updatedMember = 
StreamsGroupMember.Builder.withDefaults("member-1")
+            .setProcessId("process-1")
+            .build();
+        streamsGroup.updateMember(updatedMember);
+
+        // Cache should be invalidated
+        
assertTrue(streamsGroup.cachedEndpointToPartitions("member-1").isEmpty());
+    }
+
+    @Test
+    public void testRemoveMemberRemovesCacheEntry() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+        // Create member
+        StreamsGroupMember member = 
StreamsGroupMember.Builder.withDefaults("member-1")
+            .setProcessId("process-1")
+            .build();
+        streamsGroup.updateMember(member);
+
+        // Cache endpoint info
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+
+        // Remove member
+        streamsGroup.removeMember("member-1");
+
+        // Cache should be cleared for that member
+        
assertTrue(streamsGroup.cachedEndpointToPartitions("member-1").isEmpty());
+    }
+
+    @Test
+    public void testSetConfiguredTopologyClearsCache() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+        // Cache some entries
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+        streamsGroup.cacheEndpointToPartitions("member-2", 
endpointToPartitions);
+
+        // Set configured topology (even null should clear cache)
+        streamsGroup.setConfiguredTopology(null);
+
+        // All cache entries should be cleared
+        
assertTrue(streamsGroup.cachedEndpointToPartitions("member-1").isEmpty());
+        
assertTrue(streamsGroup.cachedEndpointToPartitions("member-2").isEmpty());
+    }
 }

Reply via email to