vamossagar12 commented on code in PR #14911:
URL: https://github.com/apache/kafka/pull/14911#discussion_r1415815684


##########
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##########
@@ -137,6 +137,99 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
     assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "5000"),
+    new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms", 
value = "5000")
+  ))
+  def 
testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled():
 Unit = {
+    val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+    val admin = cluster.createAdminClient()
+    val instanceId = "instanceId"
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    TestUtils.createOffsetsTopicWithAdmin(
+      admin = admin,
+      brokers = 
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+      controllers = raftCluster.controllerServers().asScala.toSeq
+    )
+
+    // Heartbeat request to join the group. Note that the member subscribes
+    // to an nonexistent topic.
+    var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setInstanceId(instanceId)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Could not join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
+
+    // Verify the response.
+    assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
+
+    // Create the topic.
+    val topicId = TestUtils.createTopicWithAdminRaw(
+      admin = admin,
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Prepare the next heartbeat.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setInstanceId(instanceId)
+        .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+        .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+    ).build()
+
+    // This is the expected assignment.
+    val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+    // Heartbeats until the partitions are assigned.
+    consumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+        consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+    }, msg = s"Could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
+
+    // Verify the response.
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
+
+    // No heartbeats sent till session timeout expires
+    Thread.sleep(6000L)

Review Comment:
   I tried this approach and it worked fine. Thanks 👍 



##########
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##########
@@ -137,6 +137,99 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
     assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "5000"),
+    new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms", 
value = "5000")
+  ))
+  def 
testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled():
 Unit = {
+    val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+    val admin = cluster.createAdminClient()
+    val instanceId = "instanceId"
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    TestUtils.createOffsetsTopicWithAdmin(
+      admin = admin,
+      brokers = 
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+      controllers = raftCluster.controllerServers().asScala.toSeq
+    )
+
+    // Heartbeat request to join the group. Note that the member subscribes
+    // to an nonexistent topic.
+    var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setInstanceId(instanceId)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Could not join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
+
+    // Verify the response.
+    assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
+
+    // Create the topic.
+    val topicId = TestUtils.createTopicWithAdminRaw(
+      admin = admin,
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Prepare the next heartbeat.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setInstanceId(instanceId)
+        .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+        .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+    ).build()
+
+    // This is the expected assignment.
+    val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+    // Heartbeats until the partitions are assigned.
+    consumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+        consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+    }, msg = s"Could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
+
+    // Verify the response.
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
+
+    // No heartbeats sent till session timeout expires
+    Thread.sleep(6000L)

Review Comment:
   I tried this approach and it worked fine. Thanks 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to