dajac commented on code in PR #17706:
URL: https://github.com/apache/kafka/pull/17706#discussion_r1869581460
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1936,18 +1936,25 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val expectedOperations =
AclEntry.supportedOperations(ResourceType.GROUP)
assertEquals(expectedOperations,
testGroupDescription.authorizedOperations())
- // Test that the fake group is listed as dead.
+ // Test that the fake group throws GroupIdNotFoundException
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
- val fakeGroupDescription =
describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
-
- assertEquals(fakeGroupId, fakeGroupDescription.groupId())
- assertEquals(0, fakeGroupDescription.members().size())
- assertEquals("", fakeGroupDescription.partitionAssignor())
- assertEquals(GroupState.DEAD, fakeGroupDescription.groupState())
- assertEquals(expectedOperations,
fakeGroupDescription.authorizedOperations())
+ try {
Review Comment:
nit: You may be able to use `assertFutureThrows` helper.
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -354,11 +356,18 @@ private void handleError(
case GROUP_ID_NOT_FOUND:
if (isConsumerGroupResponse) {
log.debug("`{}` request for group id {} failed because the
group is not " +
- "a new consumer group. Will retry with
`DescribeGroups` API.", apiName, groupId.idValue);
+ "a new consumer group. Will retry with
`DescribeGroups` API. {}",
Review Comment:
nit: Do you think that printing the error message is necessary here and
below?
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##########
@@ -535,34 +536,47 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>>
resetOffsets() {
switch (state) {
case "Empty":
case "Dead":
- Collection<TopicPartition> partitionsToReset =
getPartitionsToReset(groupId);
- Map<TopicPartition, OffsetAndMetadata>
preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset);
-
- // Dry-run is the default behavior if --execute is
not specified
- boolean dryRun = opts.options.has(opts.dryRunOpt)
|| !opts.options.has(opts.executeOpt);
- if (!dryRun) {
- adminClient.alterConsumerGroupOffsets(
- groupId,
- preparedOffsets,
- withTimeoutMs(new
AlterConsumerGroupOffsetsOptions())
- ).all().get();
- }
-
- result.put(groupId, preparedOffsets);
-
+ result.put(groupId,
resetOffsetsForInactiveGroup(groupId));
break;
default:
printError("Assignments can only be reset if the
group '" + groupId + "' is inactive, but the current state is " + state + ".",
Optional.empty());
result.put(groupId, Collections.emptyMap());
}
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (ExecutionException ee) {
+ if (ee.getCause() instanceof GroupIdNotFoundException) {
+ result.put(groupId,
resetOffsetsForInactiveGroup(groupId));
+ } else {
+ throw new RuntimeException(ee);
+ }
}
});
return result;
}
+ Map<TopicPartition, OffsetAndMetadata>
resetOffsetsForInactiveGroup(String groupId) {
Review Comment:
nit: private?
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##########
@@ -136,10 +136,6 @@ public void
testResetOffsetsNotExistingGroup(ClusterInstance cluster) throws Exc
String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--to-current", "--execute");
try (ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(args)) {
- // Make sure we got a coordinator
- TestUtils.waitForCondition(
- () ->
"localhost".equals(service.collectGroupState(group).coordinator.host()),
- "Can't find a coordinator");
Review Comment:
Why did you remove this one?
##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala:
##########
@@ -3405,15 +3407,21 @@ class GroupCoordinatorTest {
@Test
def testDescribeGroupWrongCoordinator(): Unit = {
- val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
+ val (error, _, _) = groupCoordinator.handleDescribeGroup(otherGroupId,
ApiKeys.DESCRIBE_GROUPS.latestVersion)
assertEquals(Errors.NOT_COORDINATOR, error)
}
@Test
def testDescribeGroupInactiveGroup(): Unit = {
- val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+ val (error, errorMessage, summary) =
groupCoordinator.handleDescribeGroup(groupId, 5)
assertEquals(Errors.NONE, error)
+ assertTrue(errorMessage.isEmpty)
assertEquals(GroupCoordinator.DeadGroup, summary)
+
+ val (errorV6, errorMessageV6, summaryV6) =
groupCoordinator.handleDescribeGroup(groupId, 6)
+ assertEquals(Errors.GROUP_ID_NOT_FOUND, errorV6)
+ assertTrue(errorMessageV6.isDefined)
Review Comment:
nit: Should we verify the message too?
##########
core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala:
##########
@@ -113,5 +113,48 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance)
extends GroupCoordinat
)
)
}
+
+ for (version <- 6 to
ApiKeys.DESCRIBE_GROUPS.latestVersion(isUnstableApiEnabled)) {
+ assertEquals(
+ List(
+ new DescribedGroup()
+ .setGroupId("grp-1")
+ .setGroupState(ClassicGroupState.STABLE.toString)
+ .setProtocolType("consumer")
+ .setProtocolData("consumer-range")
+ .setMembers(List(
+ new DescribedGroupMember()
+ .setMemberId(memberId1)
+ .setGroupInstanceId(null)
+ .setClientId("client-id")
+ .setClientHost("/127.0.0.1")
+ .setMemberMetadata(Array(1, 2, 3))
+ .setMemberAssignment(Array(4, 5, 6))
+ ).asJava),
+ new DescribedGroup()
+ .setGroupId("grp-2")
+ .setGroupState(ClassicGroupState.COMPLETING_REBALANCE.toString)
+ .setProtocolType("consumer")
+ .setMembers(List(
+ new DescribedGroupMember()
+ .setMemberId(memberId2)
+ .setGroupInstanceId(null)
+ .setClientId("client-id")
+ .setClientHost("/127.0.0.1")
+ .setMemberMetadata(Array.empty)
+ .setMemberAssignment(Array.empty)
+ ).asJava),
+ new DescribedGroup()
+ .setGroupId("grp-unknown")
+ .setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD
group when the group does not exist.
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ .setErrorMessage("Group grp-unknown not found.")
Review Comment:
Instead of duplicating the code, I wonder if we could set those two fields
based on the version. `.setErrorCode(if (version >= 6) error else null)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]