chia7712 commented on code in PR #21385:
URL: https://github.com/apache/kafka/pull/21385#discussion_r2765234237
##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -537,6 +534,21 @@ object ConfigCommand extends Logging {
adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30,
TimeUnit.SECONDS).asScala
}
+ private def listGroupConfigResources(adminClient: Admin):
Option[java.util.Collection[ConfigResource]] = {
+ try {
Review Comment:
```scala
try {
Some(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP),
new ListConfigResourcesOptions).all.get)
} catch {
// (KIP-1142) 4.1+ admin client vs older broker: treat
UnsupportedVersion as None
case e: ExecutionException if
e.getCause.isInstanceOf[UnsupportedVersionException] => None
}
```
##########
tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java:
##########
@@ -1413,6 +1416,33 @@ public void shouldNotAlterGroupConfigWithoutEntityName()
{
assertEquals("An entity name must be specified with --alter of
groups", exception.getMessage());
}
+ @Test
+ public void testDescribeGroupConfigOldBroker() {
+ ConfigCommand.ConfigCommandOptions describeOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server",
"localhost:9092",
+ "--entity-type", "groups",
+ "--describe"));
+
+ KafkaFutureImpl<Collection<ConfigResource>> future = new
KafkaFutureImpl<>();
+ ListConfigResourcesResult listConfigResourcesResult =
mock(ListConfigResourcesResult.class);
+ when(listConfigResourcesResult.all()).thenReturn(future);
+
+ AtomicBoolean listedConfigResources = new AtomicBoolean(false);
+ Node node = new Node(1, "localhost", 9092);
+ MockAdminClient mockAdminClient = new MockAdminClient(List.of(node),
node) {
+ @Override
+ public ListConfigResourcesResult
listConfigResources(Set<ConfigResource.Type> configResourceTypes,
ListConfigResourcesOptions options) {
+ ConfigResource.Type type =
configResourceTypes.iterator().next();
+ assertEquals(ConfigResource.Type.GROUP, type);
+ future.completeExceptionally(new
UnsupportedVersionException("The v0 ListConfigResources only supports
CLIENT_METRICS"));
Review Comment:
Should we add a test case for other exceptions to ensure they are propagated
correctly and not swallowed
##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -388,8 +386,7 @@ object ConfigCommand extends Logging {
case ClientMetricsType =>
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS),
new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq
case GroupType =>
- adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++
-
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP),
new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSet
+ adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++
listGroupConfigResources(adminClient).map(resources =>
resources.asScala.map(_.name).toSet).getOrElse(Set() ++ entityName)
Review Comment:
`entityName` is `None`, so `Set() ++ entityName` could be replaced by
`Set.empty`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]