This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53e117253d2 KAFKA-19807: Add RPC-level integration tests for 
StreamsGroupHeartbeat [2/2] (#20762)
53e117253d2 is described below

commit 53e117253d21b2cd00e69d6ab91dae0ee84432f2
Author: lucliu1108 <[email protected]>
AuthorDate: Thu Oct 30 14:46:16 2025 -0500

    KAFKA-19807: Add RPC-level integration tests for StreamsGroupHeartbeat 
[2/2] (#20762)
    
    ## What
    Ticket: https://issues.apache.org/jira/browse/KAFKA-19807
    Follow up on https://github.com/apache/kafka/pull/20757, add tests that:
    1. Test member with topology that triggers internal topic auto creation
    2. Test member heartbeat under dynamic configuration
    (`streams.num.standby.replicas`) change
    3. Test membership expiring and rejoining
    4. Test member heartbeat before and after group coordinator's restart
    
    Reviewers: Lucas Brutschy <[email protected]>
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../server/StreamsGroupHeartbeatRequestTest.scala  | 510 ++++++++++++++++++++-
 1 file changed, 506 insertions(+), 4 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index e3b73a7ff6d..ea955e72313 100644
--- 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -17,6 +17,8 @@
 package kafka.server
 
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, 
StreamsGroupHeartbeatResponseData}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.test.ClusterInstance
@@ -24,7 +26,7 @@ import 
org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature,
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.server.common.Feature
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertNull, assertThrows, assertTrue}
 
 import scala.jdk.CollectionConverters._
 
@@ -33,7 +35,8 @@ import scala.jdk.CollectionConverters._
   serverProperties = Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true")
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer,streams")
   )
 )
 class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
@@ -512,9 +515,488 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
     }
   }
 
+  @ClusterTest
+  def testInternalTopicsCreation(): Unit = {
+    val admin = cluster.admin()
+    val memberId = "test-member-1"
+    val groupId = "test-group"
+    val inputTopicName = "input-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create input topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = inputTopicName,
+        numPartitions = 2
+      )
+
+      // Wait for topics to be available
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(inputTopicName)
+      }, msg = s"Input topic $inputTopicName is not available")
+
+      // Create topology with internal topics (changelog and repartition 
topics)
+      val topology = createTopologyWithInternalTopics(inputTopicName, groupId)
+
+      // Send heartbeat with topology containing internal topics
+      var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = 
null
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId,
+          rebalanceTimeoutMs = 1000,
+          activeTasks = Option(streamsGroupHeartbeatResponse)
+            .map(r => convertTaskIds(r.activeTasks()))
+            .getOrElse(List.empty),
+          standbyTasks = Option(streamsGroupHeartbeatResponse)
+            .map(r => convertTaskIds(r.standbyTasks()))
+            .getOrElse(List.empty),
+          warmupTasks = Option(streamsGroupHeartbeatResponse)
+            .map(r => convertTaskIds(r.warmupTasks()))
+            .getOrElse(List.empty),
+          topology = topology
+        )
+        streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code()
+      }, "StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      // Verify the heartbeat was successful
+      assert(streamsGroupHeartbeatResponse != null, 
"StreamsGroupHeartbeatResponse should not be null")
+      assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
+      assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch())
+
+      // Wait for internal topics to be created
+      val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
+      val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition"
+
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(expectedChangelogTopic) && 
topicNames.contains(expectedRepartitionTopic)
+      }, msg = s"Internal topics $expectedChangelogTopic and 
$expectedRepartitionTopic were not created")
+
+      // Verify the internal topics exist and have correct properties
+      val changelogTopicDescription = 
admin.describeTopics(java.util.Collections.singletonList(expectedChangelogTopic)).allTopicNames().get()
+      val repartitionTopicDescription = 
admin.describeTopics(java.util.Collections.singletonList(expectedRepartitionTopic)).allTopicNames().get()
+
+      assertTrue(changelogTopicDescription.containsKey(expectedChangelogTopic),
+        s"Changelog topic $expectedChangelogTopic should exist")
+      
assertTrue(repartitionTopicDescription.containsKey(expectedRepartitionTopic),
+        s"Repartition topic $expectedRepartitionTopic should exist")
+
+      // Verify topic configurations
+      val changelogTopic = 
changelogTopicDescription.get(expectedChangelogTopic)
+      val repartitionTopic = 
repartitionTopicDescription.get(expectedRepartitionTopic)
+
+      // Both topics should have 2 partitions (matching the input topic)
+      assertEquals(2, changelogTopic.partitions().size(),
+        s"Changelog topic should have 2 partitions, but has 
${changelogTopic.partitions().size()}")
+      assertEquals(2, repartitionTopic.partitions().size(),
+        s"Repartition topic should have 2 partitions, but has 
${repartitionTopic.partitions().size()}")
+
+      // Verify replication factor
+      assertEquals(1, changelogTopic.partitions().get(0).replicas().size(),
+        s"Changelog topic should have replication factor 1")
+      assertEquals(1, repartitionTopic.partitions().get(0).replicas().size(),
+        s"Repartition topic should have replication factor 1")
+    } finally {
+      admin.close()
+    }
+  }
+
+  @ClusterTest
+  def testDynamicGroupConfig(): Unit = {
+    val admin = cluster.admin()
+    val memberId1 = "test-member-1"
+    val memberId2 = "test-member-2"
+    val groupId = "test-group"
+    val topicName = "test-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = topicName,
+        numPartitions = 2
+      )
+      // Wait for topic to be available
+      TestUtils.waitUntilTrue(() => {
+        admin.listTopics().names().get().contains(topicName)
+      }, msg = s"Topic $topicName is not available to the group coordinator")
+
+      val topology = createTopologyWithInternalTopics(topicName, groupId)
+
+      // First member joins the group
+      var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponseData = 
null
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId1,
+          rebalanceTimeoutMs = 1000,
+          activeTasks = Option(streamsGroupHeartbeatResponse1)
+            .map(r => convertTaskIds(r.activeTasks()))
+            .getOrElse(List.empty),
+          standbyTasks = Option(streamsGroupHeartbeatResponse1)
+            .map(r => convertTaskIds(r.standbyTasks()))
+            .getOrElse(List.empty),
+          warmupTasks = Option(streamsGroupHeartbeatResponse1)
+            .map(r => convertTaskIds(r.warmupTasks()))
+            .getOrElse(List.empty),
+          topology = topology,
+          processId = "process-1"
+        )
+        streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code()
+      }, "First StreamsGroupHeartbeatRequest did not succeed within the 
timeout period.")
+
+      val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
+      val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition"
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(expectedChangelogTopic) && 
topicNames.contains(expectedRepartitionTopic)
+      }, msg = s"Internal topics $expectedChangelogTopic or 
$expectedRepartitionTopic were not created")
+
+      // Second member joins the group
+      var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponseData = 
null
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId2,
+          rebalanceTimeoutMs = 1000,
+          activeTasks = Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.activeTasks()))
+            .getOrElse(List.empty),
+          standbyTasks = Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.standbyTasks()))
+            .getOrElse(List.empty),
+          warmupTasks = Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.warmupTasks()))
+            .getOrElse(List.empty),
+          topology = topology,
+          processId = "process-2"
+        )
+        streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code()
+      }, "Second StreamsGroupHeartbeatRequest did not succeed within the 
timeout period.")
+
+      // Both members continue to send heartbeats with their assigned tasks
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId1,
+          memberEpoch = streamsGroupHeartbeatResponse1.memberEpoch(),
+          rebalanceTimeoutMs = 1000,
+          activeTasks = Option(streamsGroupHeartbeatResponse1)
+            .map(r => convertTaskIds(r.activeTasks()))
+            .getOrElse(List.empty),
+          standbyTasks = Option(streamsGroupHeartbeatResponse1)
+            .map(r => convertTaskIds(r.standbyTasks()))
+            .getOrElse(List.empty),
+          warmupTasks = Option(streamsGroupHeartbeatResponse1)
+            .map(r => convertTaskIds(r.warmupTasks()))
+            .getOrElse(List.empty),
+        )
+        streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code()
+      }, "First member rebalance heartbeat did not succeed within the timeout 
period.")
+
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId2,
+          memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch(),
+          rebalanceTimeoutMs = 1000,
+          activeTasks = Option(streamsGroupHeartbeatResponse1)
+            .map(r => convertTaskIds(r.activeTasks()))
+            .getOrElse(List.empty),
+          standbyTasks = Option(streamsGroupHeartbeatResponse1)
+            .map(r => convertTaskIds(r.standbyTasks()))
+            .getOrElse(List.empty),
+          warmupTasks = Option(streamsGroupHeartbeatResponse1)
+            .map(r => convertTaskIds(r.warmupTasks()))
+            .getOrElse(List.empty)
+        )
+        streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code()
+      }, "Second member rebalance heartbeat did not succeed within the timeout 
period.")
+
+      // Verify initial state with no standby tasks
+      assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), 
"Member 1 should have no standby tasks initially")
+      assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), 
"Member 2 should have no standby tasks initially")
+
+      // Change streams.num.standby.replicas = 1
+      val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, 
groupId)
+      val alterConfigOp = new AlterConfigOp(
+        new ConfigEntry("streams.num.standby.replicas", "1"),
+        AlterConfigOp.OpType.SET
+      )
+      val configChanges = Map(groupConfigResource -> 
List(alterConfigOp).asJavaCollection).asJava
+      val options = new org.apache.kafka.clients.admin.AlterConfigsOptions()
+      admin.incrementalAlterConfigs(configChanges, options).all().get()
+
+      // Send heartbeats to trigger rebalance after config change
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId1,
+          memberEpoch = streamsGroupHeartbeatResponse1.memberEpoch(),
+          rebalanceTimeoutMs = 1000,
+          activeTasks = List.empty,
+          standbyTasks = List.empty,
+          warmupTasks = List.empty
+        )
+        streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse1.standbyTasks()!= null
+      }, "First member heartbeat after config change did not succeed within 
the timeout period.")
+
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId2,
+          memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch(),
+          rebalanceTimeoutMs = 1000,
+          activeTasks = List.empty,
+          standbyTasks = List.empty,
+          warmupTasks = List.empty
+        )
+        streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse2.standbyTasks() != null
+      }, "Second member heartbeat after config change did not succeed within 
the timeout period.")
+
+      // Verify that at least one member has active tasks
+      val member1ActiveTasksNum = 
streamsGroupHeartbeatResponse1.activeTasks().size()
+      val member2ActiveTasksNum = 
streamsGroupHeartbeatResponse2.activeTasks().size()
+      assertTrue(member1ActiveTasksNum + member2ActiveTasksNum > 0, "At least 
one member should have active tasks after config change")
+
+      // Verify that at least one member has standby tasks
+      val member1StandbyTasksNum = 
streamsGroupHeartbeatResponse1.standbyTasks().size()
+      val member2StandbyTasksNum = 
streamsGroupHeartbeatResponse2.standbyTasks().size()
+      assertTrue(member1StandbyTasksNum + member2StandbyTasksNum > 0, "At 
least one member should have standby tasks after config change")
+
+      // With 2 members and streams.num.standby.replicas=1, each active task 
should have 1 standby task
+      val totalActiveTasks = member1ActiveTasksNum + member2ActiveTasksNum
+      val totalStandbyTasks = member1StandbyTasksNum + member2StandbyTasksNum
+      assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task 
should have one standby task")
+
+    } finally {
+      admin.close()
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = "group.streams.heartbeat.interval.ms", 
value = "500"),
+      new ClusterConfigProperty(key = 
"group.streams.min.heartbeat.interval.ms", value = "500"),
+      new ClusterConfigProperty(key = "group.streams.session.timeout.ms", 
value = "501"),
+      new ClusterConfigProperty(key = "group.streams.min.session.timeout.ms", 
value = "501")
+    )
+  )
+  def testMemberJoiningAndExpiring(): Unit = {
+    val admin = cluster.admin()
+    val memberId = "test-member-1"
+    val groupId = "test-group"
+    val topicName = "test-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = topicName,
+        numPartitions = 2
+      )
+      TestUtils.waitUntilTrue(() => {
+        admin.listTopics().names().get().contains(topicName)
+      }, msg = s"Topic $topicName is not available to the group coordinator")
+
+      val topology = createMockTopology(topicName)
+
+      // First member joins the group
+      var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = 
null
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId,
+          rebalanceTimeoutMs = 1000,
+          activeTasks = List.empty,
+          standbyTasks = List.empty,
+          warmupTasks = List.empty,
+          topology = topology
+        )
+        streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code()
+      }, "StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      val memberEpoch = streamsGroupHeartbeatResponse.memberEpoch()
+      assertEquals(1, memberEpoch)
+
+      // Blocking the thread for 1 sec so that the session times out and the 
member needs to rejoin
+      Thread.sleep(1000)
+
+      // Prepare the next heartbeat which should fail due to member expiration
+      TestUtils.waitUntilTrue(() => {
+        val expiredResponse = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId,
+          memberEpoch = memberEpoch,
+          rebalanceTimeoutMs = 1000,
+          activeTasks = 
convertTaskIds(streamsGroupHeartbeatResponse.activeTasks()),
+          standbyTasks = 
convertTaskIds(streamsGroupHeartbeatResponse.standbyTasks()),
+          warmupTasks = 
convertTaskIds(streamsGroupHeartbeatResponse.warmupTasks()),
+          expectedError = Errors.UNKNOWN_MEMBER_ID
+        )
+        expiredResponse.errorCode == Errors.UNKNOWN_MEMBER_ID.code() &&
+          expiredResponse.memberEpoch() == 0 &&
+          expiredResponse.errorMessage().equals(s"Member $memberId is not a 
member of group $groupId.")
+      }, "Member should have been expired because of the timeout.")
+
+      // Member sends heartbeat again to join the streams group
+      var rejoinHeartbeatResponse: StreamsGroupHeartbeatResponseData = null
+      TestUtils.waitUntilTrue(() => {
+        rejoinHeartbeatResponse = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId,
+          rebalanceTimeoutMs = 1000,
+          activeTasks = Option(rejoinHeartbeatResponse)
+            .map(r => convertTaskIds(r.activeTasks()))
+            .getOrElse(List.empty),
+          standbyTasks = Option(rejoinHeartbeatResponse)
+            .map(r => convertTaskIds(r.standbyTasks()))
+            .getOrElse(List.empty),
+          warmupTasks = Option(rejoinHeartbeatResponse)
+            .map(r => convertTaskIds(r.warmupTasks()))
+            .getOrElse(List.empty),
+          topology = topology
+        )
+        rejoinHeartbeatResponse.errorCode == Errors.NONE.code()
+      }, "Member rejoin did not succeed within the timeout period.")
+
+      // Verify the response for rejoined member
+      assert(rejoinHeartbeatResponse != null, "Rejoin 
StreamsGroupHeartbeatResponse should not be null")
+      assertEquals(memberId, rejoinHeartbeatResponse.memberId())
+      assertTrue(rejoinHeartbeatResponse.memberEpoch() > memberEpoch, "Epoch 
should have been bumped when member rejoined")
+      val expectedActiveTasks = List(
+        new StreamsGroupHeartbeatResponseData.TaskIds()
+          .setSubtopologyId("subtopology-1")
+          .setPartitions(List(0, 1).map(_.asInstanceOf[Integer]).asJava)
+      ).asJava
+      assertEquals(expectedActiveTasks, rejoinHeartbeatResponse.activeTasks())
+      assertEquals(0, rejoinHeartbeatResponse.standbyTasks().size(), "There 
should be no standby tasks assigned")
+      assertEquals(0, rejoinHeartbeatResponse.warmupTasks().size(), "There 
should be no warmup tasks assigned")
+
+    } finally {
+      admin.close()
+    }
+  }
+
+  @ClusterTest
+  def testGroupCoordinatorChange(): Unit = {
+    val admin = cluster.admin()
+    val memberId = "test-member-1"
+    val groupId = "test-group"
+    val topicName = "test-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = topicName,
+        numPartitions = 2
+      )
+      TestUtils.waitUntilTrue(() => {
+        admin.listTopics().names().get().contains(topicName)
+      }, msg = s"Topic $topicName is not available to the group coordinator")
+
+      val topology = createMockTopology(topicName)
+
+      // First member joins the group
+      var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = 
null
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId,
+          rebalanceTimeoutMs = 1000,
+          activeTasks = Option(streamsGroupHeartbeatResponse)
+            .map(r => convertTaskIds(r.activeTasks()))
+            .getOrElse(List.empty),
+          standbyTasks = Option(streamsGroupHeartbeatResponse)
+            .map(r => convertTaskIds(r.standbyTasks()))
+            .getOrElse(List.empty),
+          warmupTasks = Option(streamsGroupHeartbeatResponse)
+            .map(r => convertTaskIds(r.warmupTasks()))
+            .getOrElse(List.empty),
+          topology = topology
+        )
+        streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code()
+      }, "StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      // Verify the response for member
+      assert(streamsGroupHeartbeatResponse != null, 
"StreamsGroupHeartbeatResponse should not be null")
+      assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
+      assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch())
+      assertNotNull(streamsGroupHeartbeatResponse.activeTasks())
+
+      // Restart the only running broker
+      val broker = cluster.brokers().values().iterator().next()
+      cluster.shutdownBroker(broker.config.brokerId)
+      cluster.startBroker(broker.config.brokerId)
+
+      // Should receive no error and no assignment changes
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId,
+          memberEpoch = streamsGroupHeartbeatResponse.memberEpoch(),
+          rebalanceTimeoutMs = 1000,
+          activeTasks = 
convertTaskIds(streamsGroupHeartbeatResponse.activeTasks()),
+          standbyTasks = 
convertTaskIds(streamsGroupHeartbeatResponse.standbyTasks()),
+          warmupTasks = 
convertTaskIds(streamsGroupHeartbeatResponse.warmupTasks())
+        )
+        streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code()
+      }, "StreamsGroupHeartbeatRequest after broker restart did not succeed 
within the timeout period.")
+
+      // Verify the response. Epoch should not have changed and null 
assignments determine that no
+      // change in old assignment
+      assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
+      assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch())
+      assertNull(streamsGroupHeartbeatResponse.activeTasks())
+      assertNull(streamsGroupHeartbeatResponse.standbyTasks())
+      assertNull(streamsGroupHeartbeatResponse.warmupTasks())
+
+    } finally {
+      admin.close()
+    }
+  }
+
   private def convertTaskIds(responseTasks: 
java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): 
List[StreamsGroupHeartbeatRequestData.TaskIds] = {
     if (responseTasks == null) {
-      List.empty
+      List()
     } else {
       responseTasks.asScala.map { responseTask =>
         new StreamsGroupHeartbeatRequestData.TaskIds()
@@ -533,7 +1015,27 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
           .setSourceTopics(List(topicName).asJava)
           .setRepartitionSinkTopics(List.empty.asJava)
           .setRepartitionSourceTopics(List.empty.asJava)
-          .setStateChangelogTopics(List.empty.asJava)
+      ).asJava)
+  }
+
+  private def createTopologyWithInternalTopics(inputTopicName: String, 
groupId: String): StreamsGroupHeartbeatRequestData.Topology = {
+    new StreamsGroupHeartbeatRequestData.Topology()
+      .setEpoch(1)
+      .setSubtopologies(List(
+        new StreamsGroupHeartbeatRequestData.Subtopology()
+          .setSubtopologyId("subtopology-1")
+          .setSourceTopics(List(inputTopicName).asJava)
+          .setRepartitionSinkTopics(List(
+            s"$groupId-subtopology-1-repartition"
+          ).asJava)
+          .setRepartitionSourceTopics(List(
+            new StreamsGroupHeartbeatRequestData.TopicInfo()
+              .setName(s"$groupId-subtopology-1-repartition")
+          ).asJava)
+          .setStateChangelogTopics(List(
+            new StreamsGroupHeartbeatRequestData.TopicInfo()
+              .setName(s"$groupId-subtopology-1-changelog")
+          ).asJava)
       ).asJava)
   }
 }
\ No newline at end of file

Reply via email to