bbejeck commented on code in PR #19189:
URL: https://github.com/apache/kafka/pull/19189#discussion_r1996114250
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15715,6 +15719,342 @@ public void testMemberJoinsEmptyStreamsGroup() {
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String subtopology2 = "subtopology2";
+ String barTopicName = "bar";
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+ new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .build();
+
+ // Member joins the streams group.
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setProcessId(DEFAULT_PROCESS_ID)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertEquals(
+ Map.of(),
+ result.response().creatableTopics()
+ );
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
+ .setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
+ .setStatusDetail("Source topics bar are missing."))),
+ result.response().data()
+ );
+
+ StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(1500)
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
+ fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6)
+ )),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new Subtopology()
+ .setSubtopologyId(subtopology1)
+ .setSourceTopics(List.of(fooTopicName))
+ .setStateChangelogTopics(List.of(new
TopicInfo().setName(barTopicName)))
+ )
+ );
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .build();
+
+ // Member joins the streams group.
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setProcessId(DEFAULT_PROCESS_ID)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertEquals(
+ Map.of(barTopicName, new
CreatableTopic().setName(barTopicName).setNumPartitions(6).setReplicationFactor((short)
-1)),
Review Comment:
nit: put builder calls on new line
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2250,25 +2250,27 @@ private StreamsTopology maybeUpdateTopology(final
String groupId,
final Topology topology,
final StreamsGroup group,
final List<CoordinatorRecord>
records) {
- StreamsTopology updatedTopology;
if (topology != null) {
- StreamsGroupTopologyValue recordValue =
convertToStreamsGroupTopologyRecord(topology);
-
- updatedTopology = StreamsTopology.fromHeartbeatRequest(topology);
-
+ StreamsTopology streamsTopologyFromRequest =
StreamsTopology.fromHeartbeatRequest(topology);
if (group.topology().isEmpty()) {
log.info("[GroupId {}][MemberId {}] Member initialized the
topology with epoch {}", groupId, memberId, topology.epoch());
-
+ StreamsGroupTopologyValue recordValue =
convertToStreamsGroupTopologyRecord(topology);
records.add(newStreamsGroupTopologyRecord(groupId,
recordValue));
- } else if (!updatedTopology.equals(group.topology().get())) {
+ return streamsTopologyFromRequest;
+ } else if (group.topology().get().topologyEpoch() >
topology.epoch()) {
+ log.info("[GroupId {}][MemberId {}] Member joined with stake
topology epoch {}", groupId, memberId, topology.epoch());
+ return group.topology().get();
+ } else if
(!group.topology().get().equals(streamsTopologyFromRequest)) {
throw new InvalidRequestException("Topology updates are not
supported yet.");
+ } else {
+ log.debug("[GroupId {}][MemberId {}] Member joined with
currently initialized topology {}", groupId, memberId, topology.epoch());
+ return group.topology().get();
Review Comment:
should any of the calls to `group.topology().get()` check for `isPresent()`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15715,6 +15719,342 @@ public void testMemberJoinsEmptyStreamsGroup() {
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
Review Comment:
nit: remove extra line
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15715,6 +15719,342 @@ public void testMemberJoinsEmptyStreamsGroup() {
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String subtopology2 = "subtopology2";
+ String barTopicName = "bar";
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+ new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .build();
+
+ // Member joins the streams group.
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setProcessId(DEFAULT_PROCESS_ID)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertEquals(
+ Map.of(),
+ result.response().creatableTopics()
+ );
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
+ .setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
+ .setStatusDetail("Source topics bar are missing."))),
+ result.response().data()
+ );
+
+ StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(1500)
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
+ fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6)
Review Comment:
nit: put `Map.of` on a new line
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2250,25 +2250,27 @@ private StreamsTopology maybeUpdateTopology(final
String groupId,
final Topology topology,
final StreamsGroup group,
final List<CoordinatorRecord>
records) {
- StreamsTopology updatedTopology;
if (topology != null) {
- StreamsGroupTopologyValue recordValue =
convertToStreamsGroupTopologyRecord(topology);
-
- updatedTopology = StreamsTopology.fromHeartbeatRequest(topology);
-
+ StreamsTopology streamsTopologyFromRequest =
StreamsTopology.fromHeartbeatRequest(topology);
if (group.topology().isEmpty()) {
log.info("[GroupId {}][MemberId {}] Member initialized the
topology with epoch {}", groupId, memberId, topology.epoch());
-
+ StreamsGroupTopologyValue recordValue =
convertToStreamsGroupTopologyRecord(topology);
records.add(newStreamsGroupTopologyRecord(groupId,
recordValue));
- } else if (!updatedTopology.equals(group.topology().get())) {
+ return streamsTopologyFromRequest;
+ } else if (group.topology().get().topologyEpoch() >
topology.epoch()) {
+ log.info("[GroupId {}][MemberId {}] Member joined with stake
topology epoch {}", groupId, memberId, topology.epoch());
Review Comment:
What's `stake topology`? did you mean `state`
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15715,6 +15719,342 @@ public void testMemberJoinsEmptyStreamsGroup() {
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String subtopology2 = "subtopology2";
+ String barTopicName = "bar";
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+ new
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .build();
+
+ // Member joins the streams group.
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setProcessId(DEFAULT_PROCESS_ID)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertEquals(
+ Map.of(),
+ result.response().creatableTopics()
+ );
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
+ .setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
+ .setStatusDetail("Source topics bar are missing."))),
+ result.response().data()
+ );
+
+ StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(1500)
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
+ fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6)
+ )),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
Review Comment:
nit: remove extra line same for similar tests below
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java:
##########
@@ -180,17 +178,13 @@ private static void enforceCopartitioning(final
StreamsTopology topology,
x.repartitionSourceTopics().stream().filter(y ->
y.partitions() == 0)
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());
- if (fixedRepartitionTopics.isEmpty() &&
flexibleRepartitionTopics.isEmpty()) {
- log.info("Skipping the repartition topic validation since there
are no repartition topics.");
- } else {
- // ensure the co-partitioning topics within the group have the
same number of partitions,
- // and enforce the number of partitions for those repartition
topics to be the same if they
- // are co-partitioned as well.
- for (Collection<Set<String>> copartitionGroups :
copartitionGroupsBySubtopology.values()) {
- for (Set<String> copartitionGroup : copartitionGroups) {
- decidedPartitionCountsForInternalTopics.putAll(
- copartitionedTopicsEnforcer.enforce(copartitionGroup,
fixedRepartitionTopics, flexibleRepartitionTopics));
- }
+ // ensure the co-partitioning topics within the group have the same
number of partitions,
+ // and enforce the number of partitions for those repartition topics
to be the same if they
+ // are co-partitioned as well.
+ for (Collection<Set<String>> copartitionGroups :
copartitionGroupsBySubtopology.values()) {
+ for (Set<String> copartitionGroup : copartitionGroups) {
+ decidedPartitionCountsForInternalTopics.putAll(
+ copartitionedTopicsEnforcer.enforce(copartitionGroup,
fixedRepartitionTopics, flexibleRepartitionTopics));
Review Comment:
How would this affect a topology where the user wants to expand partitions
with a `repartition` operator? Would it throw an error in that case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]