dajac commented on code in PR #21720:
URL: https://github.com/apache/kafka/pull/21720#discussion_r2929809725


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -254,6 +254,7 @@ public void testConsumerHeartbeatRegexValidation() {
         assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
 0)

Review Comment:
   Why do we need this change here?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########


Review Comment:
   I am a little surprised that the new default config does not break existing 
tests.



##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -683,22 +683,31 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     (joinGroupResponseData.memberId, joinGroupResponseData.generationId)
   }
 
-  protected def joinConsumerGroupWithNewProtocol(groupId: String, memberId: 
String = ""): (String, Int) = {
+  protected def joinConsumerGroupWithNewProtocol(
+    groupId: String,
+    memberId: String = "",
+    expectedMemberEpoch: Int = 0
+  ): (String, Int) = {
     val consumerGroupHeartbeatResponseData = consumerGroupHeartbeat(
       groupId = groupId,
       memberId = memberId,
       rebalanceTimeoutMs = 5 * 60 * 1000,
       subscribedTopicNames = List("foo"),
-      topicPartitions = List.empty
+      topicPartitions = List.empty,
+      expectedMemberEpoch = expectedMemberEpoch
     )
     (consumerGroupHeartbeatResponseData.memberId, 
consumerGroupHeartbeatResponseData.memberEpoch)
   }
 
-  protected def joinConsumerGroup(groupId: String, useNewProtocol: Boolean): 
(String, Int) = {
+  protected def joinConsumerGroup(
+    groupId: String,
+    useNewProtocol: Boolean,
+    expectedMemberEpoch: Int = 0

Review Comment:
   I am not a fan of this because you only wire it for the new protocol. I 
think that the caller should rather care of retrying when they have to.



##########
core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala:
##########
@@ -126,7 +126,8 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       serverAssignor = "range",
       rebalanceTimeoutMs = timeoutMs,
       subscribedTopicNames = List("foo"),
-      topicPartitions = List.empty
+      topicPartitions = List.empty,
+      expectedMemberEpoch = grp2Member1Response.memberEpoch + 1

Review Comment:
   Why don't we follow the pattern used by `grp2Member1Response`? It would 
ensure that we have an assignment and it seems to be what we want here.



##########
core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala:
##########
@@ -277,18 +278,34 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
     val groupId = "grp"
 
     // Consumer member 1 joins the group.
-    val (memberId1, _) = joinConsumerGroupWithNewProtocol(groupId, 
Uuid.randomUuid.toString)
+    val (memberId1, memberEpoch1) = joinConsumerGroupWithNewProtocol(groupId, 
Uuid.randomUuid.toString)
 
     // Classic member 2 joins the group.
-    val joinGroupResponseData = sendJoinRequest(
-      groupId = groupId
-    )
     val memberId2 = sendJoinRequest(
-      groupId = groupId,
-      memberId = joinGroupResponseData.memberId,
-      metadata = metadata(List.empty)
+      groupId = groupId
     ).memberId
 
+    // Wait until the group's target assignment is updated.

Review Comment:
   Have you considered disabling batching for existing tests and adding new 
integration tests to cover the new feature? We kind of loose coverage like 
this... 



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -3178,7 +3178,9 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     // in the broker and client can lead to other issues. This needs to be 
fixed properly by using
     // member permissions while computing assignments.
     var member2Response = 
sendAndReceiveFirstRegexHeartbeat("memberWithLimitedAccess", listenerName)
-    member1Response = sendAndReceiveRegexHeartbeat(member1Response, 
interBrokerListenerName, Some(1))
+    TestUtils.retry(15000) {
+      member1Response = sendAndReceiveRegexHeartbeat(member1Response, 
interBrokerListenerName, Some(1))
+    }

Review Comment:
   Why is this required now?



##########
core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala:
##########
@@ -99,8 +99,10 @@ class ListGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorBa
           .setGroupType(if (version >= 5) Group.GroupType.CONSUMER.toString 
else "")
 
         // Create grp-5 in new protocol. Then member 2 joins grp-5, triggering 
a rebalance. Grp-5 is in RECONCILING state.
-        memberId1InGroup5 = joinConsumerGroup("grp-5", useNewProtocol = 
true)._1
-        memberId2InGroup5 = joinConsumerGroup("grp-5", useNewProtocol = 
true)._1
+        val memberIdAndEpoch1InGroup5 = joinConsumerGroup("grp-5", 
useNewProtocol = true)
+        memberId1InGroup5 = memberIdAndEpoch1InGroup5._1
+        val memberEpoch1InGroup5 = memberIdAndEpoch1InGroup5._2
+        memberId2InGroup5 = joinConsumerGroup("grp-5", useNewProtocol = true, 
expectedMemberEpoch = memberEpoch1InGroup5 + 1)._1

Review Comment:
   ditto. Should we just disable batching here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to