lucasbru commented on code in PR #20702:
URL: https://github.com/apache/kafka/pull/20702#discussion_r2431739687


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java:
##########
@@ -85,7 +85,7 @@ public List<CoordinatorRecord> build() {
 
         // Add group epoch record.
         records.add(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 
groupEpoch, metadataHash, validatedTopologyEpoch));
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 
groupEpoch, metadataHash, validatedTopologyEpoch, Map.of()));

Review Comment:
   I think we should add assignmentConfigs to the builder as well an pass it 
here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -231,6 +237,9 @@ public StreamsGroup(
         this.currentWarmupTaskToProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
         this.topology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
         this.configuredTopology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+        this.assignmentConfigs = new TimelineHashMap<>(snapshotRegistry, 0);
+        // Set default assignment configuration

Review Comment:
   I would put any default assignment configs here. We will update the value 
inside `GroupMetadataManager`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -210,6 +210,12 @@ public static class DeadlineAndEpoch {
      */
     private int endpointInformationEpoch = -1;
 
+    /**
+     * The assignment configurations for this streams group.
+     * This is used to determine when assignment configuration changes should 
trigger a rebalance.
+     */
+    private TimelineHashMap<String, String> assignmentConfigs;

Review Comment:
   This may differ from the current assignment configuration for the group, so 
I wonder if we shouldn't just `lastUsedAssignmentConfig` instead.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -5419,6 +5429,13 @@ public void replay(
             streamsGroup.setGroupEpoch(value.epoch());
             streamsGroup.setMetadataHash(value.metadataHash());
             
streamsGroup.setValidatedTopologyEpoch(value.validatedTopologyEpoch());
+
+            if (value.assignmentConfigs() != null) {
+                for (StreamsGroupMetadataValue.AssignmentConfig config : 
value.assignmentConfigs()) {
+                    streamsGroup.setAssignmentConfig(config.key(), 
config.value());

Review Comment:
   I think this would never "unset" a configuration. For example, if we remove 
a configuration, we'd probably want to also remove it from the StreamsGroup 
object.  I would change setAssignmentConfig to setAssignmentConfigs and pass in 
a whole collection. Inside the method you can clear and putAll on the internal 
collection. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -100,18 +100,30 @@ public static CoordinatorRecord 
newStreamsGroupMetadataRecord(
         String groupId,
         int newGroupEpoch,
         long metadataHash,
-        int validatedTopologyEpoch
+        int validatedTopologyEpoch,
+        Map<String, String> assignmentConfigs
     ) {
         Objects.requireNonNull(groupId, "groupId should not be null here");

Review Comment:
   I think we should also require assignmentConfigs to be non-null here and add 
a corresponding test. When we read a record, it may happen that the record does 
not have the field (yet) and is null. But when we write a record, we should 
always have assignmentConfigs.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -100,18 +100,30 @@ public static CoordinatorRecord 
newStreamsGroupMetadataRecord(
         String groupId,
         int newGroupEpoch,
         long metadataHash,
-        int validatedTopologyEpoch
+        int validatedTopologyEpoch,
+        Map<String, String> assignmentConfigs
     ) {
         Objects.requireNonNull(groupId, "groupId should not be null here");
 
+        List<StreamsGroupMetadataValue.AssignmentConfig> assignmentConfigList 
= new ArrayList<>();
+        if (assignmentConfigs != null && !assignmentConfigs.isEmpty()) {

Review Comment:
   If we require non-null above, remove the check here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########


Review Comment:
   Please add assingment configs to this test



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18815,6 +18815,66 @@ public void 
testStreamsGroupEndpointInformationOnlyWhenEpochGreater() {
         assertNull(result.response().data().partitionsByUserEndpoint());
     }
 
+    @Test
+    public void testStreamsGroupEpochIncreaseWithAssignmentConfigChanges() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 0)
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+                    
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 
3, 4, 5)))
+                    .build())
+                .withTargetAssignment(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)))
+                .withTargetAssignmentEpoch(10)
+                .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+            )
+            .build();
+
+        // Change the assignment config
+        Properties newConfig = new Properties();
+        newConfig.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "2");
+        context.groupConfigManager.updateGroupConfig(groupId, newConfig);
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+        );
+
+        context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10));
+
+        // Verify that group epoch was bumped
+        StreamsGroup group = 
context.groupMetadataManager.streamsGroup(groupId);
+        int newGroupEpoch = group.groupEpoch();
+        assertEquals(11, newGroupEpoch);
+        assertEquals("2", 
group.assignmentConfigs().get("num.standby.replicas"));

Review Comment:
   Please also check that the records returned by `streamsGroupHeartbeat` 
contain the record with the correct updated config.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8647,10 +8664,21 @@ private TaskAssignor streamsGroupAssignor(String 
groupId) {
      * Get the assignor of the provided streams group.
      */
     private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {

Review Comment:
   I don't think we need to change this method. This will just return the 
current assignment config



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1949,6 +1949,15 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
                 reconfigureTopology = true;
             }
 
+            // Check if assignment configurations have changed
+            Map<String, String> currentAssignmentConfigs = 
streamsGroupAssignmentConfigs(groupId);
+            Map<String, String> storedAssignmentConfigs = 
group.assignmentConfigs();
+            if (!currentAssignmentConfigs.equals(storedAssignmentConfigs)) {

Review Comment:
   maybe also skip this branch if bumpGroupEpoch is already true, otherwise we 
will get the log message even for new groups.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4298,7 +4308,7 @@ private <T> CoordinatorResult<T, CoordinatorRecord> 
streamsGroupFenceMember(
 
         // We bump the group epoch.
         int groupEpoch = group.groupEpoch() + 1;
-        records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, 
group.metadataHash(), group.validatedTopologyEpoch()));
+        records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, 
group.metadataHash(), group.validatedTopologyEpoch(), Map.of()));

Review Comment:
   Wait, why are dropping the assignment configuration on the floor here? We 
should use group.assignmentConfiguration here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1978,7 +1987,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         int groupEpoch = group.groupEpoch();
         if (bumpGroupEpoch) {
             groupEpoch += 1;
-            records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch));
+            Map<String, String> assignmentConfigs = 
streamsGroupAssignmentConfigs(groupId);

Review Comment:
   We already have currentAssignmentConfigs above, can we not use it here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1949,6 +1949,15 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
                 reconfigureTopology = true;
             }
 
+            // Check if assignment configurations have changed

Review Comment:
   I think this should be moved outside of this if. We can check it right 
before actually bumping the group epoch, so in a sense, it would become step 3c 
after step 3b. 



##########
group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json:
##########
@@ -26,6 +26,20 @@
     { "name": "MetadataHash", "versions": "0+", "type": "int64",
       "about": "The hash of all topics in the group." },
     { "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions": 
"0+", "tag": 0, "default": -1, "type": "int32",
-      "about": "The topology epoch whose topics where validated to be present 
in a valid configuration in the metadata." }
+      "about": "The topology epoch whose topics where validated to be present 
in a valid configuration in the metadata." },
+    {
+      "name": "AssignmentConfigs",
+      "type": "[]AssignmentConfig",
+      "versions": "0+",

Review Comment:
   I think we want to add this as a tagged field, so that brokers that were 
using EA in 4.1 already can still read the record. Something like
   
   ```"taggedVersions": "0+", "nullableVersions": "0+", "tag": 1, "default": 
null, ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########


Review Comment:
   I would pass currentAssignmentConfigs into `updateStreamsTargetAssignment` 
so that we do not have to fetch it again inside.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18815,6 +18815,66 @@ public void 
testStreamsGroupEndpointInformationOnlyWhenEpochGreater() {
         assertNull(result.response().data().partitionsByUserEndpoint());
     }
 
+    @Test
+    public void testStreamsGroupEpochIncreaseWithAssignmentConfigChanges() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 0)
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+                    
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 
3, 4, 5)))
+                    .build())
+                .withTargetAssignment(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)))
+                .withTargetAssignmentEpoch(10)
+                .withTopology(StreamsTopology.fromHeartbeatRequest(topology))

Review Comment:
   I think we have to set `verifiedTopologyEpoch` to 0 here, otherwise the 
group epoch will be bumped because of the topology verification. 



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9739,7 +9739,7 @@ public void 
testStreamsGroupDescribeBeforeAndAfterCommittingOffset() {
         StreamsGroupMember.Builder memberBuilder1 = 
streamsGroupMemberBuilderWithDefaults(memberId1);
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
 memberBuilder1.build()));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
 memberBuilder1.build()));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
 epoch + 1, 0, -1));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
 epoch + 1, 0, -1, null));

Review Comment:
   These tests will have to be updated to not pass `null` anymore but 
`Map.of()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to