dajac commented on code in PR #14882:
URL: https://github.com/apache/kafka/pull/14882#discussion_r1412090130


##########
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##########
@@ -137,6 +137,98 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
     assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): 
Unit = {
+    val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+    val admin = cluster.createAdminClient()
+    val instanceId = "instanceId"
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    TestUtils.createOffsetsTopicWithAdmin(
+      admin = admin,
+      brokers = 
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+      controllers = raftCluster.controllerServers().asScala.toSeq
+    )
+
+    // Create the topic.
+    val topicId = TestUtils.createTopicWithAdminRaw(
+      admin = admin,
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Heartbeat request so that a static member joins the group
+    var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setInstanceId(instanceId)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    // This is the expected assignment.
+    val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Could not join the group successfully and get partitions 
assigned. Last response $consumerGroupHeartbeatResponse.")

Review Comment:
   @vamossagar12 The issue is here. Basically, when the HB is processed by the 
group coordinator, the metadata of topic `foo` are not there yet so the 
assignment is empty. I think that you should HB here until you get the desired 
assignment. Keep in mind that you have to update the memberId and memberEpoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to