cadonna commented on code in PR #19114:
URL: https://github.com/apache/kafka/pull/19114#discussion_r2020663333
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15293,6 +15305,2054 @@ public void testShareGroupStates() {
assertEquals(ShareGroup.ShareGroupState.STABLE,
context.shareGroupState(groupId));
}
+ @Test
+ public void testStreamsHeartbeatRequestValidation() {
+ String memberId = Uuid.randomUuid().toString();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ Exception ex;
+
+ // MemberId must be present in all requests.
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()));
+ assertEquals("MemberId can't be empty.", ex.getMessage());
+
+ // MemberId can't be all whitespaces.
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(" ")));
+ assertEquals("MemberId can't be empty.", ex.getMessage());
+
+ // GroupId must be present in all requests.
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)));
+ assertEquals("GroupId can't be empty.", ex.getMessage());
+
+ // GroupId can't be all whitespaces.
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId(" ")));
+ assertEquals("GroupId can't be empty.", ex.getMessage());
+
+ // RebalanceTimeoutMs must be present in the first request (epoch ==
0).
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)));
+ assertEquals("RebalanceTimeoutMs must be provided in first request.",
ex.getMessage());
+
+ // ActiveTasks must be present and empty in the first request (epoch
== 0).
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)));
+ assertEquals("ActiveTasks must be empty when (re-)joining.",
ex.getMessage());
+
+ // StandbyTasks must be present and empty in the first request (epoch
== 0).
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())));
+ assertEquals("StandbyTasks must be empty when (re-)joining.",
ex.getMessage());
+
+ // WarmupTasks must be present and empty in the first request (epoch
== 0).
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())));
+ assertEquals("WarmupTasks must be empty when (re-)joining.",
ex.getMessage());
+
+ // Topology must be present in the first request (epoch == 0).
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())));
+ assertEquals("Topology must be non-null when (re-)joining.",
ex.getMessage());
+
+ // InstanceId must be non-empty if provided in all requests.
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setInstanceId("")));
+ assertEquals("InstanceId can't be empty.", ex.getMessage());
+
+ // RackId must be non-empty if provided in all requests.
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setRackId("")));
+ assertEquals("RackId can't be empty.", ex.getMessage());
+
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(new StreamsGroupHeartbeatRequestData.Topology())
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())));
+ assertEquals("InstanceId can't be null.", ex.getMessage());
+
+ // Member epoch cannot be < -2
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(-3)
+ .setRebalanceTimeoutMs(1500)
+ ));
+ assertEquals("MemberEpoch is -3, but must be greater than or equal to
-2.", ex.getMessage());
+
+ // Topology must not be present in the later requests (epoch != 0).
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(1)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setTopology(new StreamsGroupHeartbeatRequestData.Topology())
+ ));
+ assertEquals("Topology can only be provided when (re-)joining.",
ex.getMessage());
+
+ // Topology must not contain changelog topics with fixed partition
numbers
+ StreamsInvalidTopologyException topoEx =
assertThrows(StreamsInvalidTopologyException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setTopology(new
StreamsGroupHeartbeatRequestData.Topology().setSubtopologies(
+ List.of(
+ new StreamsGroupHeartbeatRequestData.Subtopology()
+ .setStateChangelogTopics(
+ List.of(
+ new
StreamsGroupHeartbeatRequestData.TopicInfo()
+
.setName("changelog_topic_with_fixed_partition")
+ .setPartitions(3)
+ )
+ )
+ )
+ ))
+ ));
+ assertEquals("Changelog topic changelog_topic_with_fixed_partition
must have an undefined partition count, but it is set to 3.",
+ topoEx.getMessage());
+ }
+
+ @Test
+ public void testUnknownStreamsGroupId() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ GroupIdNotFoundException e =
assertThrows(GroupIdNotFoundException.class, () ->
+ context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(100) // Epoch must be > 0.
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())));
+ assertEquals("Streams group fooup not found.", e.getMessage());
+ }
+
+ @Test
+ public void testUnknownMemberIdJoinsStreamsGroup() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Topology topology = new Topology();
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .build();
+
+ assignor.prepareGroupAssignment(Map.of(memberId, TasksTuple.EMPTY));
+
+ // A first member joins to create the group.
+ context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ // The second member is rejected because the member id is unknown and
+ // the member epoch is not zero.
+ final String memberId2 = Uuid.randomUuid().toString();
+ UnknownMemberIdException e =
assertThrows(UnknownMemberIdException.class, () ->
+ context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(1)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())));
+ assertEquals(String.format("Member %s is not a member of group %s.",
memberId2, groupId), e.getMessage());
+ }
+
+ @Test
+ public void testStreamsGroupMemberEpochValidation() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .build();
+ assignor.prepareGroupAssignment(Map.of(memberId, TasksTuple.EMPTY));
+
+ StreamsGroupMember member =
streamsGroupMemberBuilderWithDefaults(memberId)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3)))
+ .build();
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
member));
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
100));
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology));
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
+ TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3)
+ )));
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
100));
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
member));
+
+ // Member epoch is greater than the expected epoch.
+ FencedMemberEpochException e1 =
assertThrows(FencedMemberEpochException.class, () ->
+ context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(200)
+ .setRebalanceTimeoutMs(1500)));
+ assertEquals("The streams group member has a greater member epoch
(200) than the one known by the group coordinator (100). "
+ + "The member must abandon all its partitions and rejoin.",
e1.getMessage());
+
+ // Member epoch is smaller than the expected epoch.
+ FencedMemberEpochException e2 =
assertThrows(FencedMemberEpochException.class, () ->
+ context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(50)
+ .setRebalanceTimeoutMs(1500)));
+ assertEquals("The streams group member has a smaller member epoch (50)
than the one known by the group coordinator (100). "
+ + "The member must abandon all its partitions and rejoin.",
e2.getMessage());
+
+ // Member joins with previous epoch but without providing tasks.
+ FencedMemberEpochException e3 =
assertThrows(FencedMemberEpochException.class, () ->
+ context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(99)
+ .setRebalanceTimeoutMs(1500)));
+ assertEquals("The streams group member has a smaller member epoch (99)
than the one known by the group coordinator (100). "
+ + "The member must abandon all its partitions and rejoin.",
e3.getMessage());
+
+ // Member joins with previous epoch and has a subset of the owned
tasks.
+ // This is accepted as the response with the bumped epoch may have
been lost.
+ // In this case, we provide back the correct epoch to the member.
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(99)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of(new
StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(1, 2))))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+ assertEquals(100, result.response().data().memberEpoch());
+ }
+
+ @Test
+ public void testMemberJoinsEmptyStreamsGroup() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String subtopology2 = "subtopology2";
+ String barTopicName = "bar";
+ Uuid barTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+ new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build())
+ .build();
+
+ assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
+ TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
+ )));
+
+ assertThrows(GroupIdNotFoundException.class, () ->
+ context.groupMetadataManager.streamsGroup(groupId));
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setProcessId("process-id")
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5)),
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology2)
+ .setPartitions(List.of(0, 1, 2))
+ ))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ result.response().data()
+ );
+
+ StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(1500)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
+ TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)))
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
+ fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
+ barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
+ )),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
+ TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
+ TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
+ )),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment()
{
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String subtopology2 = "subtopology2";
+ String barTopicName = "bar";
+ Uuid barTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+ new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build())
+ .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+ .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2,
3, 4, 5)))
+ .build())
+ .withTargetAssignment(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
+ .withTargetAssignmentEpoch(10)
+ .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+ .withPartitionMetadata(Map.of(
+ fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
+ barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
+ ))
+ )
+ .build();
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
+ TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
+ ))
+ );
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10)
+ .setProcessId("process-id2")
+ );
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5)),
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology2)
+ .setPartitions(List.of(0, 1, 2))
+ ))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ result.response().data()
+ );
+
+ StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
+ TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)))
+ .setProcessId("process-id2")
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
+ TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
+ TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
+ )),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void
testStreamsUpdatingPartitionMetadataTriggersNewTargetAssignment() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String subtopology2 = "subtopology2";
+ String barTopicName = "bar";
+ Uuid barTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+ new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build())
+ .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+ .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2,
3, 4, 5)))
+ .build())
+ .withTargetAssignment(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
+ .withTargetAssignmentEpoch(10)
+ .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
Review Comment:
Yeah, sure. My bad! I meant it as you did.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]