This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 2d096d8c427 MINOR: Cache endpoint-to-partitions computation in streams 
group heartbeat (#21526)
2d096d8c427 is described below

commit 2d096d8c42761338251186cfac2ceba726b96917
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Feb 25 09:48:46 2026 +0100

    MINOR: Cache endpoint-to-partitions computation in streams group heartbeat 
(#21526)
    
    When the endpoint information epoch known to a member is outdated, the
    broker recomputes the endpoint-to-partitions mapping for all members
    from scratch. This is expensive because it happens for each member
    individually as they catch up to the new epoch.
    
    This commit adds a per-member cache for endpoint-to-partitions results
    in StreamsGroup, keyed by member ID. Cache entries are explicitly
    invalidated when a member changes, when a member is  removed, or when
    the configured topology changes.
    
    The computation and endpoint building logic is moved from
    GroupMetadataManager into StreamsGroup. This keeps the cache management
    colocated with the member lifecycle methods that need to invalidate it
    (updateMember, removeMember, setConfiguredTopology), and also avoids the
    overhead of the Collections.unmodifiableMap wrapper returned by the
    public members() accessor.
    
    Tests cover cache population, retrieval, and invalidation on task
    change, member removal, topology change, and preservation when tasks are
    unchanged.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 24 +-----
 .../coordinator/group/streams/StreamsGroup.java    | 91 +++++++++++++++++++++
 .../group/streams/StreamsGroupTest.java            | 95 ++++++++++++++++++++++
 3 files changed, 188 insertions(+), 22 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 2e0651c199f..23a8f02b3ea 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -161,7 +161,6 @@ import 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
 import 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
-import 
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
 import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
 import 
org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
 import org.apache.kafka.server.authorizer.Action;
@@ -2135,6 +2134,7 @@ public class GroupMetadataManager {
             
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(updatedMember.assignedTasks().activeTasksWithEpochs()));
             
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
             
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
+            
group.invalidateCachedEndpointToPartitions(updatedMember.memberId());
             if (updatedMember.userEndpoint().isPresent()) {
                 // If no user endpoint is defined, there is no change in the 
endpoint information.
                 // Otherwise, bump the endpoint information epoch
@@ -2143,7 +2143,7 @@ public class GroupMetadataManager {
         }
 
         if (group.endpointInformationEpoch() != memberEndpointEpoch) {
-            
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group, 
updatedMember));
+            
response.setPartitionsByUserEndpoint(group.buildEndpointToPartitions(updatedMember,
 metadataImage));
         }
         if (groups.containsKey(group.groupId())) {
             // If we just created the group, the endpoint information epoch 
will not be persisted, so return epoch 0.
@@ -2260,26 +2260,6 @@ public class GroupMetadataManager {
             .collect(Collectors.toList());
     }
 
-    private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
maybeBuildEndpointToPartitions(StreamsGroup group,
-                                                                               
                         StreamsGroupMember updatedMember) {
-        List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitionsList = new ArrayList<>();
-        final Map<String, StreamsGroupMember> members = group.members();
-        // Build endpoint information for all members except the updated member
-        for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) 
{
-            if (updatedMember != null && 
entry.getKey().equals(updatedMember.memberId())) {
-                continue;
-            }
-            
EndpointToPartitionsManager.maybeEndpointToPartitions(entry.getValue(), group, 
metadataImage)
-                .ifPresent(endpointToPartitionsList::add);
-        }
-        // Always build endpoint information for the updated member (whether 
new or existing)
-        if (updatedMember != null) {
-            
EndpointToPartitionsManager.maybeEndpointToPartitions(updatedMember, group, 
metadataImage)
-                .ifPresent(endpointToPartitionsList::add);
-        }
-        return endpointToPartitionsList;
-    }
-
     /**
      * Handles a regular heartbeat from a consumer group member. It mainly 
consists of
      * three parts:
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 621a8f9d1bd..69e81b355af 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -22,6 +22,7 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.utils.LogContext;
@@ -35,6 +36,7 @@ import 
org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.coordinator.group.Utils;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import 
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
 import org.apache.kafka.timeline.TimelineInteger;
@@ -43,6 +45,7 @@ import org.apache.kafka.timeline.TimelineObject;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -216,6 +219,13 @@ public class StreamsGroup implements Group {
      * This is used to determine when assignment configuration changes should 
trigger a rebalance.
      */
     private TimelineHashMap<String, String> lastAssignmentConfigs;
+    
+    /**
+     * Cache for endpoint-to-partitions mappings, keyed by member ID.
+     * Entries are explicitly invalidated when a member's tasks change, the 
member is removed,
+     * or the topology changes.
+     */
+    private final Map<String, 
StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitionsCache = new HashMap<>();
 
     public StreamsGroup(
         LogContext logContext,
@@ -290,6 +300,8 @@ public class StreamsGroup implements Group {
 
     public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
         this.configuredTopology.set(Optional.ofNullable(configuredTopology));
+        // Clear endpoint cache since subtopology source topics may have 
changed
+        endpointToPartitionsCache.clear();
     }
 
     /**
@@ -433,6 +445,7 @@ public class StreamsGroup implements Group {
         maybeUpdateTaskProcessId(oldMember, newMember);
         updateStaticMember(newMember);
         maybeUpdateGroupState();
+        endpointToPartitionsCache.remove(newMember.memberId());
     }
 
     /**
@@ -456,6 +469,7 @@ public class StreamsGroup implements Group {
         maybeRemoveTaskProcessId(oldMember);
         removeStaticMember(oldMember);
         maybeUpdateGroupState();
+        endpointToPartitionsCache.remove(memberId);
     }
 
     /**
@@ -1127,6 +1141,83 @@ public class StreamsGroup implements Group {
         this.endpointInformationEpoch = endpointInformationEpoch;
     }
 
+    // Visible for testing
+    Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
cachedEndpointToPartitions(
+        String memberId
+    ) {
+        return Optional.ofNullable(endpointToPartitionsCache.get(memberId));
+    }
+
+    // Visible for testing
+    void cacheEndpointToPartitions(
+        String memberId,
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions
+    ) {
+        endpointToPartitionsCache.put(memberId, endpointToPartitions);
+    }
+
+    /**
+     * Invalidates the cached endpoint-to-partitions entry for the given 
member.
+     * This should be called when a member's assigned tasks change during 
reconciliation,
+     * before record replay has had a chance to call updateMember().
+     *
+     * @param memberId The member ID whose cache entry should be invalidated.
+     */
+    public void invalidateCachedEndpointToPartitions(String memberId) {
+        endpointToPartitionsCache.remove(memberId);
+    }
+
+    /**
+     * Builds the endpoint-to-partitions list for all members, using the cache 
where possible.
+     *
+     * @param updatedMember The member that was just updated (may have a stale 
entry in the members map).
+     * @param metadataImage The current metadata image for resolving topic 
partitions.
+     * @return The list of endpoint-to-partitions mappings for all members 
with endpoints.
+     */
+    public List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
buildEndpointToPartitions(
+        StreamsGroupMember updatedMember,
+        CoordinatorMetadataImage metadataImage
+    ) {
+        List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitionsList = new ArrayList<>();
+        if (updatedMember == null) {
+            log.error("[GroupId {}] updatedMember is unexpectedly null in 
buildEndpointToPartitions. " +
+                "This is a bug, please file a JIRA ticket.", groupId);
+            return endpointToPartitionsList;
+        }
+        for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) 
{
+            if (entry.getKey().equals(updatedMember.memberId())) {
+                continue;
+            }
+            getOrComputeEndpointToPartitions(entry.getValue(), metadataImage)
+                .ifPresent(endpointToPartitionsList::add);
+        }
+        getOrComputeEndpointToPartitions(updatedMember, metadataImage)
+            .ifPresent(endpointToPartitionsList::add);
+        return endpointToPartitionsList;
+    }
+
+    private Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
getOrComputeEndpointToPartitions(
+        StreamsGroupMember member,
+        CoordinatorMetadataImage metadataImage
+    ) {
+        if (member.userEndpoint().isEmpty()) {
+            return Optional.empty();
+        }
+
+        String memberId = member.memberId();
+
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions cached = 
endpointToPartitionsCache.get(memberId);
+        if (cached != null) {
+            return Optional.of(cached);
+        }
+
+        Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
computed =
+            EndpointToPartitionsManager.maybeEndpointToPartitions(member, 
this, metadataImage);
+        computed.ifPresent(endpointToPartitions ->
+            endpointToPartitionsCache.put(memberId, endpointToPartitions));
+        return computed;
+    }
+
     /**
      * @return The assignment configurations for this streams group.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index f015441f86f..44dc0d4615e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.utils.LogContext;
@@ -1246,4 +1247,98 @@ public class StreamsGroupTest {
 
         verify(timer).cancel("initial-rebalance-timeout-test-group");
     }
+
+    // Endpoint-to-partitions cache tests
+
+    @Test
+    public void testGetCachedEndpointToPartitionsReturnsEmptyWhenNoCache() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+
+        Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
cached =
+            streamsGroup.cachedEndpointToPartitions("member-1");
+
+        assertTrue(cached.isEmpty());
+    }
+
+    @Test
+    public void testCacheEndpointToPartitionsAndRetrieve() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+        endpointToPartitions.setUserEndpoint(
+            new 
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092)
+        );
+
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+        Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
cached =
+            streamsGroup.cachedEndpointToPartitions("member-1");
+
+        assertTrue(cached.isPresent());
+        assertEquals(endpointToPartitions, cached.get());
+    }
+
+    @Test
+    public void testUpdateMemberInvalidatesCache() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+        // Create initial member
+        StreamsGroupMember member = 
StreamsGroupMember.Builder.withDefaults("member-1")
+            .setProcessId("process-1")
+            .build();
+        streamsGroup.updateMember(member);
+
+        // Cache endpoint info
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+
+        // Update member
+        StreamsGroupMember updatedMember = 
StreamsGroupMember.Builder.withDefaults("member-1")
+            .setProcessId("process-1")
+            .build();
+        streamsGroup.updateMember(updatedMember);
+
+        // Cache should be invalidated
+        
assertTrue(streamsGroup.cachedEndpointToPartitions("member-1").isEmpty());
+    }
+
+    @Test
+    public void testRemoveMemberRemovesCacheEntry() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+        // Create member
+        StreamsGroupMember member = 
StreamsGroupMember.Builder.withDefaults("member-1")
+            .setProcessId("process-1")
+            .build();
+        streamsGroup.updateMember(member);
+
+        // Cache endpoint info
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+
+        // Remove member
+        streamsGroup.removeMember("member-1");
+
+        // Cache should be cleared for that member
+        
assertTrue(streamsGroup.cachedEndpointToPartitions("member-1").isEmpty());
+    }
+
+    @Test
+    public void testSetConfiguredTopologyClearsCache() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+        // Cache some entries
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+        streamsGroup.cacheEndpointToPartitions("member-2", 
endpointToPartitions);
+
+        // Set configured topology (even null should clear cache)
+        streamsGroup.setConfiguredTopology(null);
+
+        // All cache entries should be cleared
+        
assertTrue(streamsGroup.cachedEndpointToPartitions("member-1").isEmpty());
+        
assertTrue(streamsGroup.cachedEndpointToPartitions("member-2").isEmpty());
+    }
 }

Reply via email to