lucasbru commented on code in PR #20760:
URL: https://github.com/apache/kafka/pull/20760#discussion_r2494287393
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -678,6 +679,112 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @Test
+ public void testValidateOffsetCommitWithOlderEpoch() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 2, 1, 2)),
+ Map.of(), Map.of()))
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+ // Received epoch (1) < assignment epoch (2) should throw
+ assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("input-topic", Uuid.ZERO_UUID, 0));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochMissingTopology() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .build());
+
+ // Topology is retrieved when creating validator, so exception is
thrown here
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-1", "", 1, false,
ApiKeys.OFFSET_COMMIT.latestVersion()));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochMissingSubtopology() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+ assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("unknown-topic", Uuid.ZERO_UUID, 0));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochUnassignedPartition() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 2)),
+ Map.of(), Map.of()))
+ .setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+ // Partition 0 assigned with epoch 2, received epoch 1 should throw
+ assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("input-topic", Uuid.ZERO_UUID, 0));
+
+ // Partition 1 not assigned should throw
+ assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("input-topic", Uuid.ZERO_UUID, 1));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochValidAssignment() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(5)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 2, 1, 2)),
+ Map.of(), Map.of()))
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 2, false, ApiKeys.OFFSET_COMMIT.latestVersion());
Review Comment:
Yes, exactly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]