This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 42604b46c2e KAFKA-19807: Add RPC-level integration tests for
StreamsGroupHeartbeat [1/2] (#20757)
42604b46c2e is described below
commit 42604b46c2e33a0a9205ca22fc7d542b24926d0f
Author: lucliu1108 <[email protected]>
AuthorDate: Thu Oct 30 09:21:42 2025 -0500
KAFKA-19807: Add RPC-level integration tests for StreamsGroupHeartbeat
[1/2] (#20757)
## What
Ticket: https://issues.apache.org/jira/browse/KAFKA-19807
Add integration test similar to `ShareGroupHeartbeatRequestTest` and
`ConsumerGroupHeartbeatRequestTest` for `StreamsGroupHeartbeat`
Reviewers: Lucas Brutschy <[email protected]>
---------
Co-authored-by: Lucas Brutschy <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
.../server/GroupCoordinatorBaseRequestTest.scala | 2 +
.../server/StreamsGroupHeartbeatRequestTest.scala | 539 +++++++++++++++++++++
2 files changed, 541 insertions(+)
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 0b96a8355fc..58ea7d92b0d 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -880,6 +880,7 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
warmupTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
topology: StreamsGroupHeartbeatRequestData.Topology = null,
expectedError: Errors = Errors.NONE,
+ processId: String = null,
version: Short =
ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)
): StreamsGroupHeartbeatResponseData = {
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequest.Builder(
@@ -892,6 +893,7 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
.setStandbyTasks(standbyTasks.asJava)
.setWarmupTasks(warmupTasks.asJava)
.setTopology(topology)
+ .setProcessId(processId)
).build(version)
// Send the request until receiving a successful response. There is a delay
diff --git
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
new file mode 100644
index 00000000000..e3b73a7ff6d
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData,
StreamsGroupHeartbeatResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty,
ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.common.errors.UnsupportedVersionException
+import org.apache.kafka.server.common.Feature
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertThrows, assertTrue}
+
+import scala.jdk.CollectionConverters._
+
+@ClusterTestDefaults(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true")
+ )
+)
+class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
+
+ @ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,streams"),
+ )
+ )
+ def testStreamsGroupHeartbeatWithInvalidAPIVersion(): Unit = {
+ // Test that invalid API version throws UnsupportedVersionException
+ assertThrows(classOf[UnsupportedVersionException], () =>
+ streamsGroupHeartbeat(
+ groupId = "test-group",
+ expectedError = Errors.UNSUPPORTED_VERSION,
+ version = -1)
+ )
+ }
+
+ @ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,streams"),
+ ),
+ features = Array(
+ new ClusterFeature(feature = Feature.STREAMS_VERSION, version = 0)
+ )
+ )
+ def testStreamsGroupHeartbeatIsInaccessibleWhenDisabledByFeatureConfig():
Unit = {
+ // Test with streams.version = 0, the API is disabled at server level
+ val topology = new StreamsGroupHeartbeatRequestData.Topology()
+ .setEpoch(1)
+ .setSubtopologies(List().asJava)
+
+ val streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+ groupId = "test-group",
+ memberId = "test-member",
+ rebalanceTimeoutMs = 1000,
+ activeTasks = List.empty,
+ standbyTasks = List.empty,
+ warmupTasks = List.empty,
+ topology = topology,
+ expectedError = Errors.UNSUPPORTED_VERSION,
+ )
+
+ val expectedResponse = new
StreamsGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code())
+ assertEquals(expectedResponse, streamsGroupHeartbeatResponse)
+ }
+
+ @ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
+ )
+ )
+ def
testStreamsGroupHeartbeatIsInaccessibleWhenDisabledByStaticGroupCoordinatorProtocolConfig():
Unit = {
+ val topology = new StreamsGroupHeartbeatRequestData.Topology()
+ .setEpoch(1)
+ .setSubtopologies(List().asJava)
+
+ val streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+ groupId = "test-group",
+ memberId = "test-member",
+ rebalanceTimeoutMs = 1000,
+ activeTasks = List.empty,
+ standbyTasks = List.empty,
+ warmupTasks = List.empty,
+ topology = topology,
+ expectedError = Errors.UNSUPPORTED_VERSION
+ )
+
+ val expectedResponse = new
StreamsGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code())
+ assertEquals(expectedResponse, streamsGroupHeartbeatResponse)
+ }
+
+ @ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,streams"),
+ )
+ )
+ def
testStreamsGroupHeartbeatIsInaccessibleWhenUnstableLatestVersionNotEnabled():
Unit = {
+ val topology = new StreamsGroupHeartbeatRequestData.Topology()
+ .setEpoch(1)
+ .setSubtopologies(List().asJava)
+
+ val streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+ groupId = "test-group",
+ memberId = "test-member",
+ rebalanceTimeoutMs = 1000,
+ activeTasks = List.empty,
+ standbyTasks = List.empty,
+ warmupTasks = List.empty,
+ topology = topology,
+ expectedError = Errors.NOT_COORDINATOR
+ )
+
+ val expectedResponse = new
StreamsGroupHeartbeatResponseData().setErrorCode(Errors.NOT_COORDINATOR.code())
+ assertEquals(expectedResponse, streamsGroupHeartbeatResponse)
+ }
+
+ @ClusterTest
+ def
testStreamsGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabledTopicNotExistFirst():
Unit = {
+ val admin = cluster.admin()
+ val memberId = "test-member"
+ val groupId = "test-group"
+ val topicName = "test-topic"
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ try {
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq
+ )
+
+ val topology = createMockTopology(topicName)
+
+ // Heartbeat when topic does not exist
+ var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData =
null
+ TestUtils.waitUntilTrue(() => {
+ streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId,
+ rebalanceTimeoutMs = 1000,
+ activeTasks = List.empty,
+ standbyTasks = List.empty,
+ warmupTasks = List.empty,
+ topology = topology
+ )
+ streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code()
+ }, "StreamsGroupHeartbeatRequest did not succeed within the timeout
period.")
+
+ // Verify the response
+ assertNotNull(streamsGroupHeartbeatResponse,
"StreamsGroupHeartbeatResponse should not be null")
+ assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
+ assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch())
+ val expectedStatus = new StreamsGroupHeartbeatResponseData.Status()
+ .setStatusCode(1)
+ .setStatusDetail(s"Source topics $topicName are missing.")
+ assertEquals(expectedStatus,
streamsGroupHeartbeatResponse.status().get(0))
+
+ // Create topic
+ TestUtils.createTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq,
+ topic = topicName,
+ numPartitions = 3
+ )
+ // Wait for topic to be available
+ TestUtils.waitUntilTrue(() => {
+ admin.listTopics().names().get().contains(topicName)
+ }, msg = s"Topic $topicName is not available to the group coordinator")
+
+ // Heartbeat after topic is created
+ TestUtils.waitUntilTrue(() => {
+ streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId,
+ rebalanceTimeoutMs = 1000,
+ activeTasks = List.empty,
+ standbyTasks = List.empty,
+ warmupTasks = List.empty,
+ topology = topology
+ )
+ streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code()
+ }, "StreamsGroupHeartbeatRequest did not succeed within the timeout
period.")
+
+ // Active task assignment should be available
+ assertNotNull(streamsGroupHeartbeatResponse,
"StreamsGroupHeartbeatResponse should not be null")
+ assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
+ assertEquals(2, streamsGroupHeartbeatResponse.memberEpoch())
+ assertEquals(null, streamsGroupHeartbeatResponse.status())
+ val expectedActiveTasks = List(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId("subtopology-1")
+ .setPartitions(List(0, 1, 2).map(_.asInstanceOf[Integer]).asJava)
+ ).asJava
+ assertEquals(expectedActiveTasks,
streamsGroupHeartbeatResponse.activeTasks())
+ } finally {
+ admin.close()
+ }
+ }
+
+ @ClusterTest
+ def testStreamsGroupHeartbeatForMultipleMembers(): Unit = {
+ val admin = cluster.admin()
+ val memberId1 = "test-member-1"
+ val memberId2 = "test-member-2"
+ val groupId = "test-group"
+ val topicName = "test-topic"
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ try {
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq
+ )
+
+ // Create topic
+ TestUtils.createTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq,
+ topic = topicName,
+ numPartitions = 3
+ )
+ TestUtils.waitUntilTrue(() => {
+ admin.listTopics().names().get().contains(topicName)
+ }, msg = s"Topic $topicName is not available to the group coordinator")
+
+ val topology = createMockTopology(topicName)
+
+ // First member joins the group
+ var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponseData =
null
+ TestUtils.waitUntilTrue(() => {
+ streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId1,
+ rebalanceTimeoutMs = 1000,
+ activeTasks = Option(streamsGroupHeartbeatResponse1)
+ .map(r => convertTaskIds(r.activeTasks()))
+ .getOrElse(List.empty),
+ standbyTasks = Option(streamsGroupHeartbeatResponse1)
+ .map(r => convertTaskIds(r.standbyTasks()))
+ .getOrElse(List.empty),
+ warmupTasks = Option(streamsGroupHeartbeatResponse1)
+ .map(r => convertTaskIds(r.warmupTasks()))
+ .getOrElse(List.empty),
+ topology = topology
+ )
+ streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code()
+ }, "First StreamsGroupHeartbeatRequest did not succeed within the
timeout period.")
+
+ // Verify first member gets all tasks initially
+ assertNotNull(streamsGroupHeartbeatResponse1,
"StreamsGroupHeartbeatResponse should not be null")
+ assertEquals(memberId1, streamsGroupHeartbeatResponse1.memberId())
+ assertEquals(1, streamsGroupHeartbeatResponse1.memberEpoch())
+ assertEquals(1, streamsGroupHeartbeatResponse1.activeTasks().size())
+ assertEquals(3,
streamsGroupHeartbeatResponse1.activeTasks().get(0).partitions().size())
+
+ // Second member joins the group (should trigger a rebalance)
+ var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponseData =
null
+ TestUtils.waitUntilTrue(() => {
+ streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId2,
+ rebalanceTimeoutMs = 1000,
+ activeTasks = Option(streamsGroupHeartbeatResponse2)
+ .map(r => convertTaskIds(r.activeTasks()))
+ .getOrElse(List.empty),
+ standbyTasks = Option(streamsGroupHeartbeatResponse2)
+ .map(r => convertTaskIds(r.standbyTasks()))
+ .getOrElse(List.empty),
+ warmupTasks = Option(streamsGroupHeartbeatResponse2)
+ .map(r => convertTaskIds(r.warmupTasks()))
+ .getOrElse(List.empty),
+ topology = topology
+ )
+ streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code()
+ }, "Second StreamsGroupHeartbeatRequest did not succeed within the
timeout period.")
+
+ // Verify second member gets assigned
+ assertNotNull(streamsGroupHeartbeatResponse2,
"StreamsGroupHeartbeatResponse should not be null")
+ assertEquals(memberId2, streamsGroupHeartbeatResponse2.memberId())
+ assertEquals(2, streamsGroupHeartbeatResponse2.memberEpoch())
+
+ // Wait for both members to get their task assignments by sending
heartbeats
+ // until they both have non-null activeTasks
+ TestUtils.waitUntilTrue(() => {
+ streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId1,
+ memberEpoch = streamsGroupHeartbeatResponse1.memberEpoch(),
+ rebalanceTimeoutMs = 1000,
+ activeTasks =
convertTaskIds(streamsGroupHeartbeatResponse1.activeTasks()),
+ standbyTasks =
convertTaskIds(streamsGroupHeartbeatResponse1.standbyTasks()),
+ warmupTasks =
convertTaskIds(streamsGroupHeartbeatResponse1.warmupTasks())
+ )
+ streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() &&
+ streamsGroupHeartbeatResponse1.activeTasks() != null
+ }, "First member did not get task assignment within the timeout period.")
+
+ TestUtils.waitUntilTrue(() => {
+ streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId2,
+ memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch(),
+ rebalanceTimeoutMs = 1000,
+ activeTasks =
convertTaskIds(streamsGroupHeartbeatResponse2.activeTasks()),
+ standbyTasks =
convertTaskIds(streamsGroupHeartbeatResponse2.standbyTasks()),
+ warmupTasks =
convertTaskIds(streamsGroupHeartbeatResponse2.warmupTasks())
+ )
+ streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() &&
+ streamsGroupHeartbeatResponse2.activeTasks() != null
+ }, "Second member did not get task assignment within the timeout
period.")
+
+
+ // Verify both members should have tasks assigned
+ assertNotNull(streamsGroupHeartbeatResponse1,
"StreamsGroupHeartbeatResponse should not be null")
+ assertEquals(memberId1, streamsGroupHeartbeatResponse1.memberId())
+
+ assertNotNull(streamsGroupHeartbeatResponse2,
"StreamsGroupHeartbeatResponse should not be null")
+ assertEquals(memberId2, streamsGroupHeartbeatResponse2.memberId())
+
+ // At least one member should have active tasks
+ val totalActiveTasks =
streamsGroupHeartbeatResponse1.activeTasks().size() +
streamsGroupHeartbeatResponse2.activeTasks().size()
+ assertTrue(totalActiveTasks > 0, "At least one member should have active
tasks")
+
+ } finally {
+ admin.close()
+ }
+ }
+
+ @ClusterTest
+ def testEmptyStreamsGroupId(): Unit = {
+ val admin = cluster.admin()
+
+ try {
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq
+ )
+
+ val topology = new StreamsGroupHeartbeatRequestData.Topology()
+ .setEpoch(1)
+ .setSubtopologies(List().asJava)
+
+ val streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+ groupId = "", // Empty group ID
+ memberId = "test-member",
+ rebalanceTimeoutMs = 1000,
+ activeTasks = List.empty,
+ standbyTasks = List.empty,
+ warmupTasks = List.empty,
+ topology = topology,
+ expectedError = Errors.INVALID_REQUEST
+ )
+
+ val expectedResponse = new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("GroupId can't be empty.")
+ assertEquals(expectedResponse, streamsGroupHeartbeatResponse)
+ } finally {
+ admin.close()
+ }
+ }
+
+ @ClusterTest
+ def testMemberLeaveHeartbeat(): Unit = {
+ val admin = cluster.admin()
+ val topicName = "test-topic"
+
+ try {
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq
+ )
+
+ // Create topic
+ TestUtils.createTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq,
+ topic = topicName,
+ numPartitions = 3
+ )
+ TestUtils.waitUntilTrue(() => {
+ admin.listTopics().names().get().contains(topicName)
+ }, msg = s"Topic $topicName is not available to the group coordinator")
+
+ val topology = createMockTopology(topicName)
+
+ // Join group
+ var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData =
null
+ TestUtils.waitUntilTrue(() => {
+ streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+ groupId = "test-group",
+ memberId = "test-member",
+ rebalanceTimeoutMs = 1000,
+ activeTasks = List.empty,
+ standbyTasks = List.empty,
+ warmupTasks = List.empty,
+ topology = topology
+ )
+ streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code()
+ }, "StreamsGroupHeartbeatRequest did not succeed within the timeout
period.")
+
+ // Verify the member joined successfully
+ assertNotNull(streamsGroupHeartbeatResponse,
"StreamsGroupHeartbeatResponse should not be null")
+ assertEquals("test-member", streamsGroupHeartbeatResponse.memberId())
+ assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch())
+
+ // Send a leave request
+ streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+ groupId = "test-group",
+ memberId = streamsGroupHeartbeatResponse.memberId(),
+ memberEpoch = -1, // LEAVE_GROUP_MEMBER_EPOCH
+ rebalanceTimeoutMs = 1000,
+ activeTasks = List.empty,
+ standbyTasks = List.empty,
+ warmupTasks = List.empty
+ )
+
+ // Verify the leave request was successful
+ assertEquals(Errors.NONE.code(),
streamsGroupHeartbeatResponse.errorCode())
+ assertEquals("test-member", streamsGroupHeartbeatResponse.memberId())
+ assertEquals(-1, streamsGroupHeartbeatResponse.memberEpoch())
+
+ } finally {
+ admin.close()
+ }
+ }
+
+ @ClusterTest
+ def testInvalidMemberEpoch(): Unit = {
+ val admin = cluster.admin()
+ val topicName = "test-topic"
+
+ try {
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq
+ )
+
+ // Create topic
+ TestUtils.createTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq,
+ topic = topicName,
+ numPartitions = 3
+ )
+ TestUtils.waitUntilTrue(() => {
+ admin.listTopics().names().get().contains(topicName)
+ }, msg = s"Topic $topicName is not available to the group coordinator")
+
+ val topology = createMockTopology(topicName)
+
+ var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData =
null
+ TestUtils.waitUntilTrue(() => {
+ streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+ groupId = "test-group",
+ memberId = "test-member",
+ rebalanceTimeoutMs = 1000,
+ activeTasks = List.empty,
+ standbyTasks = List.empty,
+ warmupTasks = List.empty,
+ topology = topology
+ )
+ streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code()
+ }, "StreamsGroupHeartbeatRequest did not succeed within the timeout
period.")
+
+ streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+ groupId = "test-group",
+ memberId = streamsGroupHeartbeatResponse.memberId(),
+ memberEpoch = 999, // Too high member epoch
+ rebalanceTimeoutMs = 1000,
+ activeTasks = List.empty,
+ standbyTasks = List.empty,
+ warmupTasks = List.empty,
+ topology = null,
+ expectedError = Errors.FENCED_MEMBER_EPOCH
+ )
+
+ val expectedResponse = new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.FENCED_MEMBER_EPOCH.code())
+ .setErrorMessage("The streams group member has a greater member epoch
(999) than the one known by the group coordinator (1). The member must abandon
all its partitions and rejoin.")
+ assertEquals(expectedResponse, streamsGroupHeartbeatResponse)
+ } finally {
+ admin.close()
+ }
+ }
+
+ private def convertTaskIds(responseTasks:
java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]):
List[StreamsGroupHeartbeatRequestData.TaskIds] = {
+ if (responseTasks == null) {
+ List.empty
+ } else {
+ responseTasks.asScala.map { responseTask =>
+ new StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(responseTask.subtopologyId)
+ .setPartitions(responseTask.partitions)
+ }.toList
+ }
+ }
+
+ private def createMockTopology(topicName: String):
StreamsGroupHeartbeatRequestData.Topology = {
+ new StreamsGroupHeartbeatRequestData.Topology()
+ .setEpoch(1)
+ .setSubtopologies(List(
+ new StreamsGroupHeartbeatRequestData.Subtopology()
+ .setSubtopologyId("subtopology-1")
+ .setSourceTopics(List(topicName).asJava)
+ .setRepartitionSinkTopics(List.empty.asJava)
+ .setRepartitionSourceTopics(List.empty.asJava)
+ .setStateChangelogTopics(List.empty.asJava)
+ ).asJava)
+ }
+}
\ No newline at end of file