This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5cf6a9d80d2 MINOR: Cleanup Tools Module (1/n) (#20091)
5cf6a9d80d2 is described below
commit 5cf6a9d80d21671f9eb9b9a000e826b9487d19f2
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Tue Jul 22 15:44:25 2025 +0530
MINOR: Cleanup Tools Module (1/n) (#20091)
Now that Kafka support Java 17, this PR makes some changes in tools
module. The changes in this PR are limited to only some files. A future
PR(s) shall follow.
The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
Sub modules targeted: tools/src/main
Reviewers: Ken Huang <[email protected]>, Jhen-Yung Hsu
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../java/org/apache/kafka/tools/AclCommand.java | 26 +++---
.../kafka/tools/ClientCompatibilityTest.java | 8 +-
.../apache/kafka/tools/ClientMetricsCommand.java | 20 ++---
.../java/org/apache/kafka/tools/ClusterTool.java | 6 +-
.../org/apache/kafka/tools/ConnectPluginPath.java | 26 +++---
.../apache/kafka/tools/ConsumerPerformance.java | 7 +-
.../apache/kafka/tools/DelegationTokenCommand.java | 3 +-
.../org/apache/kafka/tools/EndToEndLatency.java | 6 +-
.../org/apache/kafka/tools/FeatureCommand.java | 3 +-
.../org/apache/kafka/tools/GetOffsetShell.java | 5 +-
.../java/org/apache/kafka/tools/GroupsCommand.java | 7 +-
.../main/java/org/apache/kafka/tools/JmxTool.java | 10 +--
.../apache/kafka/tools/LeaderElectionCommand.java | 5 +-
.../org/apache/kafka/tools/ManifestWorkspace.java | 27 ++----
.../apache/kafka/tools/MetadataQuorumCommand.java | 56 ++++++-------
.../java/org/apache/kafka/tools/OffsetsUtils.java | 5 +-
.../kafka/tools/ReplicaVerificationTool.java | 4 +-
.../kafka/tools/ShareConsumerPerformance.java | 3 +-
.../org/apache/kafka/tools/StreamsResetter.java | 17 ++--
.../java/org/apache/kafka/tools/TopicCommand.java | 80 ++++++++----------
.../kafka/tools/TransactionalMessageCopier.java | 11 ++-
.../apache/kafka/tools/TransactionsCommand.java | 47 +++++------
.../org/apache/kafka/tools/VerifiableConsumer.java | 7 +-
.../kafka/tools/VerifiableShareConsumer.java | 3 +-
.../kafka/tools/consumer/ConsoleConsumer.java | 9 +-
.../tools/consumer/ConsoleConsumerOptions.java | 7 +-
.../kafka/tools/consumer/ConsoleShareConsumer.java | 3 +-
.../tools/consumer/group/ConsumerGroupCommand.java | 25 +++---
.../group/ConsumerGroupCommandOptions.java | 14 ++--
.../tools/consumer/group/ShareGroupCommand.java | 2 +-
.../consumer/group/ShareGroupCommandOptions.java | 12 ++-
.../tools/reassign/ReassignPartitionsCommand.java | 96 ++++++++++------------
.../tools/reassign/VerifyAssignmentResult.java | 3 +-
.../kafka/tools/streams/StreamsGroupCommand.java | 7 +-
.../tools/streams/StreamsGroupCommandOptions.java | 16 ++--
35 files changed, 264 insertions(+), 322 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
index b6c0175331d..791397b2405 100644
--- a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
@@ -40,9 +40,7 @@ import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -177,7 +175,7 @@ public class AclCommand {
private static void removeAcls(Admin adminClient,
Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws
ExecutionException, InterruptedException {
if (acls.isEmpty()) {
- adminClient.deleteAcls(Collections.singletonList(new
AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
+ adminClient.deleteAcls(List.of(new AclBindingFilter(filter,
AccessControlEntryFilter.ANY))).all().get();
} else {
List<AclBindingFilter> aclBindingFilters =
acls.stream().map(acl -> new AclBindingFilter(filter,
acl.toFilter())).collect(Collectors.toList());
adminClient.deleteAcls(aclBindingFilters).all().get();
@@ -249,8 +247,8 @@ public class AclCommand {
Set<ResourcePatternFilter> transactionalIds =
filters.stream().filter(f -> f.resourceType() ==
ResourceType.TRANSACTIONAL_ID).collect(Collectors.toSet());
boolean enableIdempotence = opts.options.has(opts.idempotentOpt);
- Set<AccessControlEntry> topicAcls = getAcl(opts, new
HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE)));
- Set<AccessControlEntry> transactionalIdAcls = getAcl(opts, new
HashSet<>(Arrays.asList(WRITE, DESCRIBE)));
+ Set<AccessControlEntry> topicAcls = getAcl(opts, Set.of(WRITE,
DESCRIBE, CREATE));
+ Set<AccessControlEntry> transactionalIdAcls = getAcl(opts,
Set.of(WRITE, DESCRIBE));
//Write, Describe, Create permission on topics, Write, Describe on
transactionalIds
Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new
HashMap<>();
@@ -261,7 +259,7 @@ public class AclCommand {
result.put(transactionalId, transactionalIdAcls);
}
if (enableIdempotence) {
- result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts,
Collections.singleton(IDEMPOTENT_WRITE)));
+ result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts,
Set.of(IDEMPOTENT_WRITE)));
}
return result;
}
@@ -272,8 +270,8 @@ public class AclCommand {
Set<ResourcePatternFilter> groups = filters.stream().filter(f ->
f.resourceType() == ResourceType.GROUP).collect(Collectors.toSet());
//Read, Describe on topic, Read on consumerGroup
- Set<AccessControlEntry> topicAcls = getAcl(opts, new
HashSet<>(Arrays.asList(READ, DESCRIBE)));
- Set<AccessControlEntry> groupAcls = getAcl(opts,
Collections.singleton(READ));
+ Set<AccessControlEntry> topicAcls = getAcl(opts, Set.of(READ,
DESCRIBE));
+ Set<AccessControlEntry> groupAcls = getAcl(opts, Set.of(READ));
Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new
HashMap<>();
for (ResourcePatternFilter topic : topics) {
@@ -333,9 +331,9 @@ public class AclCommand {
if (opts.options.has(hostOptionSpec)) {
return
opts.options.valuesOf(hostOptionSpec).stream().map(String::trim).collect(Collectors.toSet());
} else if (opts.options.has(principalOptionSpec)) {
- return Collections.singleton(AclEntry.WILDCARD_HOST);
+ return Set.of(AclEntry.WILDCARD_HOST);
} else {
- return Collections.emptySet();
+ return Set.of();
}
}
@@ -345,7 +343,7 @@ public class AclCommand {
.map(s -> SecurityUtils.parseKafkaPrincipal(s.trim()))
.collect(Collectors.toSet());
} else {
- return Collections.emptySet();
+ return Set.of();
}
}
@@ -547,7 +545,7 @@ public class AclCommand {
if (!options.has(bootstrapServerOpt) &&
!options.has(bootstrapControllerOpt)) {
CommandLineUtils.printUsageAndExit(parser, "One of
--bootstrap-server or --bootstrap-controller must be specified");
}
- List<AbstractOptionSpec<?>> mutuallyExclusiveOptions =
Arrays.asList(addOpt, removeOpt, listOpt);
+ List<AbstractOptionSpec<?>> mutuallyExclusiveOptions =
List.of(addOpt, removeOpt, listOpt);
long mutuallyExclusiveOptionsCount =
mutuallyExclusiveOptions.stream()
.filter(abstractOptionSpec ->
options.has(abstractOptionSpec))
.count();
@@ -592,10 +590,10 @@ public class AclCommand {
@Override
public String valuePattern() {
- List<PatternType> values = Arrays.asList(PatternType.values());
+ List<PatternType> values = List.of(PatternType.values());
List<PatternType> filteredValues = values.stream()
.filter(type -> type != PatternType.UNKNOWN)
- .collect(Collectors.toList());
+ .toList();
return filteredValues.stream()
.map(Object::toString)
.collect(Collectors.joining("|"));
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index b4f189cfa38..73c01b3ea1c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -59,7 +59,6 @@ import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -67,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -291,13 +291,13 @@ public class ClientCompatibilityTest {
tryFeature("createTopics", testConfig.createTopicsSupported,
() -> {
try {
- client.createTopics(Collections.singleton(
+ client.createTopics(Set.of(
new NewTopic("newtopic", 1, (short)
1))).all().get();
} catch (ExecutionException e) {
throw e.getCause();
}
},
- () -> createTopicsResultTest(client,
Collections.singleton("newtopic"))
+ () -> createTopicsResultTest(client, Set.of("newtopic"))
);
while (true) {
@@ -337,7 +337,7 @@ public class ClientCompatibilityTest {
);
Map<ConfigResource, Config> brokerConfig =
-
client.describeConfigs(Collections.singleton(configResource)).all().get();
+
client.describeConfigs(Set.of(configResource)).all().get();
if (brokerConfig.get(configResource).entries().isEmpty()) {
throw new KafkaException("Expected to see config
entries, but got zero entries");
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
index 6bd0f29d33d..6aec33b5938 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
@@ -37,10 +37,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -91,11 +91,7 @@ public class ClientMetricsCommand {
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
- if (cause != null) {
- printException(cause);
- } else {
- printException(e);
- }
+ printException(Objects.requireNonNullElse(cause, e));
exitCode = 1;
} catch (Throwable t) {
printException(t);
@@ -130,8 +126,8 @@ public class ClientMetricsCommand {
Collection<AlterConfigOp> alterEntries =
configsToBeSet.entrySet().stream()
.map(entry -> new AlterConfigOp(new
ConfigEntry(entry.getKey(), entry.getValue()),
entry.getValue().isEmpty() ?
AlterConfigOp.OpType.DELETE : AlterConfigOp.OpType.SET))
- .collect(Collectors.toList());
-
adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource,
alterEntries), alterOptions).all()
+ .toList();
+ adminClient.incrementalAlterConfigs(Map.of(configResource,
alterEntries), alterOptions).all()
.get(30, TimeUnit.SECONDS);
System.out.println("Altered client metrics config for " +
entityName + ".");
@@ -144,8 +140,8 @@ public class ClientMetricsCommand {
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName);
AlterConfigsOptions alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
Collection<AlterConfigOp> alterEntries = oldConfigs.stream()
- .map(entry -> new AlterConfigOp(entry,
AlterConfigOp.OpType.DELETE)).collect(Collectors.toList());
-
adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource,
alterEntries), alterOptions)
+ .map(entry -> new AlterConfigOp(entry,
AlterConfigOp.OpType.DELETE)).toList();
+ adminClient.incrementalAlterConfigs(Map.of(configResource,
alterEntries), alterOptions)
.all().get(30, TimeUnit.SECONDS);
System.out.println("Deleted client metrics config for " +
entityName + ".");
@@ -162,7 +158,7 @@ public class ClientMetricsCommand {
System.out.println("The client metric resource " +
entityNameOpt.get() + " doesn't exist and doesn't have dynamic config.");
return;
}
- entities = Collections.singletonList(entityNameOpt.get());
+ entities = List.of(entityNameOpt.get());
} else {
Collection<ConfigResource> resources = adminClient
.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new
ListConfigResourcesOptions())
@@ -189,7 +185,7 @@ public class ClientMetricsCommand {
private Collection<ConfigEntry> getClientMetricsConfig(String
entityName) throws Exception {
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName);
- Map<ConfigResource, Config> result =
adminClient.describeConfigs(Collections.singleton(configResource))
+ Map<ConfigResource, Config> result =
adminClient.describeConfigs(Set.of(configResource))
.all().get(30, TimeUnit.SECONDS);
return result.get(configResource).entries();
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
index ccffaeae0f1..3048179a4bc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
@@ -32,8 +32,8 @@ import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import java.io.PrintStream;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@@ -74,7 +74,7 @@ public class ClusterTool {
.help("Unregister a broker.");
Subparser listEndpoints = subparsers.addParser("list-endpoints")
.help("List endpoints");
- for (Subparser subpparser : Arrays.asList(clusterIdParser,
unregisterParser, listEndpoints)) {
+ for (Subparser subpparser : List.of(clusterIdParser, unregisterParser,
listEndpoints)) {
MutuallyExclusiveGroup connectionOptions =
subpparser.addMutuallyExclusiveGroup().required(true);
connectionOptions.addArgument("--bootstrap-server", "-b")
.action(store())
@@ -162,7 +162,7 @@ public class ClusterTool {
Collection<Node> nodes =
adminClient.describeCluster(option).nodes().get();
String maxHostLength = String.valueOf(nodes.stream().map(node ->
node.host().length()).max(Integer::compareTo).orElse(100));
- String maxRackLength = String.valueOf(nodes.stream().filter(node
-> node.hasRack()).map(node ->
node.rack().length()).max(Integer::compareTo).orElse(10));
+ String maxRackLength =
String.valueOf(nodes.stream().filter(Node::hasRack).map(node ->
node.rack().length()).max(Integer::compareTo).orElse(10));
if (listControllerEndpoints) {
String format = "%-10s %-" + maxHostLength + "s %-10s %-" +
maxRackLength + "s %-15s%n";
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
index 428bdf54a9d..a3543df76c7 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
@@ -42,7 +42,6 @@ import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -149,14 +148,12 @@ public class ConnectPluginPath {
if (subcommand == null) {
throw new ArgumentParserException("No subcommand specified",
parser);
}
- switch (subcommand) {
- case "list":
- return new Config(Command.LIST, locations, false, false, out,
err);
- case "sync-manifests":
- return new Config(Command.SYNC_MANIFESTS, locations,
namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out,
err);
- default:
- throw new ArgumentParserException("Unrecognized subcommand: '"
+ subcommand + "'", parser);
- }
+ return switch (subcommand) {
+ case "list" -> new Config(Command.LIST, locations, false, false,
out, err);
+ case "sync-manifests" ->
+ new Config(Command.SYNC_MANIFESTS, locations,
namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out,
err);
+ default -> throw new ArgumentParserException("Unrecognized
subcommand: '" + subcommand + "'", parser);
+ };
}
private static Set<Path> parseLocations(ArgumentParser parser, Namespace
namespace) throws ArgumentParserException, TerseException {
@@ -197,7 +194,7 @@ public class ConnectPluginPath {
}
enum Command {
- LIST, SYNC_MANIFESTS;
+ LIST, SYNC_MANIFESTS
}
private static class Config {
@@ -326,11 +323,12 @@ public class ConnectPluginPath {
rowAliases.add(PluginUtils.prunedName(pluginDesc));
rows.add(newRow(workspace, pluginDesc.className(), new
ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true));
// If a corresponding manifest exists, mark it as loadable by
removing it from the map.
+ // TODO: The use of Collections here shall be fixed with
KAFKA-19524.
nonLoadableManifests.getOrDefault(pluginDesc.className(),
Collections.emptySet()).remove(pluginDesc.type());
});
nonLoadableManifests.forEach((className, types) -> types.forEach(type
-> {
// All manifests which remain in the map are not loadable
- rows.add(newRow(workspace, className, Collections.emptyList(),
type, PluginDesc.UNDEFINED_VERSION, false));
+ rows.add(newRow(workspace, className, List.of(), type,
PluginDesc.UNDEFINED_VERSION, false));
}));
return rows;
}
@@ -436,8 +434,8 @@ public class ConnectPluginPath {
}
private static PluginScanResult discoverPlugins(PluginSource source,
ReflectionScanner reflectionScanner, ServiceLoaderScanner serviceLoaderScanner)
{
- PluginScanResult serviceLoadResult =
serviceLoaderScanner.discoverPlugins(Collections.singleton(source));
- PluginScanResult reflectiveResult =
reflectionScanner.discoverPlugins(Collections.singleton(source));
- return new PluginScanResult(Arrays.asList(serviceLoadResult,
reflectiveResult));
+ PluginScanResult serviceLoadResult =
serviceLoaderScanner.discoverPlugins(Set.of(source));
+ PluginScanResult reflectiveResult =
reflectionScanner.discoverPlugins(Set.of(source));
+ return new PluginScanResult(List.of(serviceLoadResult,
reflectiveResult));
}
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
index 0a5106d0e9c..0bbc953293f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -37,7 +37,6 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
@@ -221,8 +220,8 @@ public class ConsumerPerformance {
}
public static class ConsumerPerfRebListener implements
ConsumerRebalanceListener {
- private AtomicLong joinTimeMs;
- private AtomicLong joinTimeMsInSingleRound;
+ private final AtomicLong joinTimeMs;
+ private final AtomicLong joinTimeMsInSingleRound;
private long joinStartMs;
public ConsumerPerfRebListener(AtomicLong joinTimeMs, long
joinStartMs, AtomicLong joinTimeMsInSingleRound) {
@@ -355,7 +354,7 @@ public class ConsumerPerformance {
}
public Set<String> topic() {
- return Collections.singleton(options.valueOf(topicOpt));
+ return Set.of(options.valueOf(topicOpt));
}
public long numMessages() {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java
b/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java
index 04162041f8e..dad388fee6b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java
@@ -38,7 +38,6 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Base64;
-import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -106,7 +105,7 @@ public class DelegationTokenCommand {
CreateDelegationTokenResult createResult =
adminClient.createDelegationToken(createDelegationTokenOptions);
DelegationToken token = createResult.delegationToken().get();
System.out.println("Created delegation token with tokenId : " +
token.tokenInfo().tokenId());
- printToken(Collections.singletonList(token));
+ printToken(List.of(token));
return token;
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
index 96aed36f37e..c6914b4667b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
+++ b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
@@ -34,11 +34,11 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -90,7 +90,7 @@ public class EndToEndLatency {
int messageSizeBytes = Integer.parseInt(args[4]);
Optional<String> propertiesFile = (args.length > 5 &&
!Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();
- if (!Arrays.asList("1", "all").contains(acks)) {
+ if (!List.of("1", "all").contains(acks)) {
throw new IllegalArgumentException("Latency testing requires
synchronous acknowledgement. Please use 1 or all");
}
@@ -186,7 +186,7 @@ public class EndToEndLatency {
Admin adminClient = Admin.create(adminProps);
NewTopic newTopic = new NewTopic(topic, DEFAULT_NUM_PARTITIONS,
DEFAULT_REPLICATION_FACTOR);
try {
-
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+ adminClient.createTopics(Set.of(newTopic)).all().get();
} catch (ExecutionException | InterruptedException e) {
System.out.printf("Creation of topic %s failed%n", topic);
throw new RuntimeException(e);
diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
index ca0ce8c4ac0..103821cf21d 100644
--- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
@@ -39,7 +39,6 @@ import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import net.sourceforge.argparse4j.internal.HelpScreenException;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -240,7 +239,7 @@ public class FeatureCommand {
}
static String metadataVersionsToString(MetadataVersion first,
MetadataVersion last) {
- List<MetadataVersion> versions =
Arrays.asList(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal()
+ 1);
+ List<MetadataVersion> versions =
List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1);
return versions.stream()
.map(String::valueOf)
.collect(Collectors.joining(", "));
diff --git a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
index a2dafcdef7a..4b46f336087 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
@@ -42,7 +42,6 @@ import
org.apache.kafka.tools.filter.TopicPartitionFilter.TopicFilterAndPartitio
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -312,7 +311,7 @@ public class GetOffsetShell {
* PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
*/
public TopicPartitionFilter
createTopicPartitionFilterWithPatternList(String topicPartitions) {
- List<String> ruleSpecs = Arrays.asList(topicPartitions.split(","));
+ List<String> ruleSpecs = List.of(topicPartitions.split(","));
List<TopicPartitionFilter> rules = ruleSpecs.stream().map(ruleSpec -> {
try {
return parseRuleSpec(ruleSpec);
@@ -338,7 +337,7 @@ public class GetOffsetShell {
Set<Integer> partitions;
if (partitionsString == null || partitionsString.isEmpty()) {
- partitions = Collections.emptySet();
+ partitions = Set.of();
} else {
try {
partitions =
Arrays.stream(partitionsString.split(",")).map(Integer::parseInt).collect(Collectors.toSet());
diff --git a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
index c79826e7092..d4b933c9f6c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@@ -75,11 +76,7 @@ public class GroupsCommand {
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
- if (cause != null) {
- printException(cause);
- } else {
- printException(e);
- }
+ printException(Objects.requireNonNullElse(cause, e));
exitCode = 1;
} catch (Throwable t) {
printException(t);
diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java
b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java
index 56cf8d85ab8..c87b15aadf8 100644
--- a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java
@@ -96,7 +96,7 @@ public class JmxTool {
while (keepGoing) {
long start = System.currentTimeMillis();
Map<String, Object> attributes = queryAttributes(conn, found,
attributesInclude);
- attributes.put("time", dateFormat.isPresent() ?
dateFormat.get().format(new Date()) :
String.valueOf(System.currentTimeMillis()));
+ attributes.put("time", dateFormat.map(format ->
format.format(new Date())).orElseGet(() ->
String.valueOf(System.currentTimeMillis())));
maybePrintDataRows(reportFormat, numExpectedAttributes, keys,
attributes);
if (options.isOneTime()) {
keepGoing = false;
@@ -225,7 +225,7 @@ public class JmxTool {
AttributeList attributes =
conn.getAttributes(objectName, attributesNames(mBeanInfo));
List<ObjectName> expectedAttributes = new
ArrayList<>();
attributes.asList().forEach(attribute -> {
- if
(Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+ if
(List.of(attributesInclude.get()).contains(attribute.getName())) {
expectedAttributes.add(objectName);
}
});
@@ -254,10 +254,10 @@ public class JmxTool {
for (ObjectName objectName : objectNames) {
MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
AttributeList attributes = conn.getAttributes(objectName,
- Arrays.stream(beanInfo.getAttributes()).map(a ->
a.getName()).toArray(String[]::new));
+
Arrays.stream(beanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new));
for (Attribute attribute : attributes.asList()) {
if (attributesInclude.isPresent()) {
- if
(Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+ if
(List.of(attributesInclude.get()).contains(attribute.getName())) {
result.put(String.format("%s:%s",
objectName.toString(), attribute.getName()),
attribute.getValue());
}
@@ -395,7 +395,7 @@ public class JmxTool {
private String parseFormat() {
String reportFormat =
options.valueOf(reportFormatOpt).toLowerCase(Locale.ROOT);
- return Arrays.asList("properties", "csv",
"tsv").contains(reportFormat) ? reportFormat : "original";
+ return List.of("properties", "csv", "tsv").contains(reportFormat)
? reportFormat : "original";
}
public boolean hasJmxAuthPropOpt() {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
index 2814034e167..2b0f9c4d7b9 100644
--- a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -93,7 +92,7 @@ public class LeaderElectionCommand {
Optional<Integer> partitionOption =
Optional.ofNullable(commandOptions.getPartition());
final Optional<Set<TopicPartition>> singleTopicPartition =
(topicOption.isPresent() && partitionOption.isPresent()) ?
- Optional.of(Collections.singleton(new
TopicPartition(topicOption.get(), partitionOption.get()))) :
+ Optional.of(Set.of(new TopicPartition(topicOption.get(),
partitionOption.get()))) :
Optional.empty();
/* Note: No need to look at --all-topic-partitions as we want this to
be null if it is use.
@@ -346,7 +345,7 @@ public class LeaderElectionCommand {
}
// One and only one is required: --topic, --all-topic-partitions
or --path-to-json-file
- List<AbstractOptionSpec<?>> mutuallyExclusiveOptions =
Arrays.asList(
+ List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = List.of(
topic,
allTopicPartitions,
pathToJsonFile
diff --git a/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java
b/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java
index d600c5df536..305a91cfdc4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java
@@ -45,7 +45,6 @@ import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
@@ -83,23 +82,13 @@ public class ManifestWorkspace {
}
public SourceWorkspace<?> forSource(PluginSource source) throws
IOException {
- SourceWorkspace<?> sourceWorkspace;
- switch (source.type()) {
- case CLASSPATH:
- sourceWorkspace = new ClasspathWorkspace(source);
- break;
- case MULTI_JAR:
- sourceWorkspace = new MultiJarWorkspace(source);
- break;
- case SINGLE_JAR:
- sourceWorkspace = new SingleJarWorkspace(source);
- break;
- case CLASS_HIERARCHY:
- sourceWorkspace = new ClassHierarchyWorkspace(source);
- break;
- default:
- throw new IllegalStateException("Unknown source type " +
source.type());
- }
+ SourceWorkspace<?> sourceWorkspace = switch (source.type()) {
+ case CLASSPATH -> new ClasspathWorkspace(source);
+ case MULTI_JAR -> new MultiJarWorkspace(source);
+ case SINGLE_JAR -> new SingleJarWorkspace(source);
+ case CLASS_HIERARCHY -> new ClassHierarchyWorkspace(source);
+ default -> throw new IllegalStateException("Unknown source type "
+ source.type());
+ };
workspaces.add(sourceWorkspace);
return sourceWorkspace;
}
@@ -390,7 +379,7 @@ public class ManifestWorkspace {
}
try (FileSystem jar = FileSystems.newFileSystem(
new URI("jar", writableJar.toUri().toString(), ""),
- Collections.emptyMap()
+ Map.of()
)) {
Path zipRoot = jar.getRootDirectories().iterator().next();
// Set dryRun to false because this jar file is already a writable
copy.
diff --git
a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
index 322e0a7bac3..b65655a8c89 100644
--- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
@@ -50,7 +50,6 @@ import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
@@ -127,35 +126,36 @@ public class MetadataQuorumCommand {
Optional.ofNullable(namespace.getString("bootstrap_controller")));
admin = Admin.create(props);
- if (command.equals("describe")) {
- if (namespace.getBoolean("status") &&
namespace.getBoolean("replication")) {
- throw new TerseException("Only one of --status or
--replication should be specified with describe sub-command");
- } else if (namespace.getBoolean("replication")) {
- boolean humanReadable =
Optional.of(namespace.getBoolean("human_readable")).orElse(false);
- handleDescribeReplication(admin, humanReadable);
- } else if (namespace.getBoolean("status")) {
- if (namespace.getBoolean("human_readable")) {
- throw new TerseException("The option --human-readable
is only supported along with --replication");
+ switch (command) {
+ case "describe" -> {
+ if (namespace.getBoolean("status") &&
namespace.getBoolean("replication")) {
+ throw new TerseException("Only one of --status or
--replication should be specified with describe sub-command");
+ } else if (namespace.getBoolean("replication")) {
+ boolean humanReadable =
Optional.of(namespace.getBoolean("human_readable")).orElse(false);
+ handleDescribeReplication(admin, humanReadable);
+ } else if (namespace.getBoolean("status")) {
+ if (namespace.getBoolean("human_readable")) {
+ throw new TerseException("The option
--human-readable is only supported along with --replication");
+ }
+ handleDescribeStatus(admin);
+ } else {
+ throw new TerseException("One of --status or
--replication must be specified with describe sub-command");
}
- handleDescribeStatus(admin);
- } else {
- throw new TerseException("One of --status or --replication
must be specified with describe sub-command");
}
- } else if (command.equals("add-controller")) {
- if (optionalCommandConfig == null) {
- throw new TerseException("You must supply the
configuration file of the controller you are " +
- "adding when using add-controller.");
+ case "add-controller" -> {
+ if (optionalCommandConfig == null) {
+ throw new TerseException("You must supply the
configuration file of the controller you are " +
+ "adding when using add-controller.");
+ }
+ handleAddController(admin,
+ namespace.getBoolean("dry_run"),
+ props);
}
- handleAddController(admin,
- namespace.getBoolean("dry_run"),
- props);
- } else if (command.equals("remove-controller")) {
- handleRemoveController(admin,
+ case "remove-controller" -> handleRemoveController(admin,
namespace.getInt("controller_id"),
namespace.getString("controller_directory_id"),
namespace.getBoolean("dry_run"));
- } else {
- throw new IllegalStateException(format("Unknown command: %s",
command));
+ default -> throw new IllegalStateException(format("Unknown
command: %s", command));
}
} finally {
if (admin != null)
@@ -231,7 +231,7 @@ public class MetadataQuorumCommand {
lastFetchTimestamp,
lastCaughtUpTimestamp,
status
- ).map(r -> r.toString()).collect(Collectors.toList());
+ ).map(Object::toString).collect(Collectors.toList());
}).collect(Collectors.toList());
}
@@ -253,7 +253,7 @@ public class MetadataQuorumCommand {
QuorumInfo quorumInfo =
admin.describeMetadataQuorum().quorumInfo().get();
int leaderId = quorumInfo.leaderId();
QuorumInfo.ReplicaState leader =
quorumInfo.voters().stream().filter(voter -> voter.replicaId() ==
leaderId).findFirst().get();
- QuorumInfo.ReplicaState maxLagFollower =
quorumInfo.voters().stream().min(Comparator.comparingLong(qi ->
qi.logEndOffset())).get();
+ QuorumInfo.ReplicaState maxLagFollower =
quorumInfo.voters().stream().min(Comparator.comparingLong(QuorumInfo.ReplicaState::logEndOffset)).get();
long maxFollowerLag = leader.logEndOffset() -
maxLagFollower.logEndOffset();
long maxFollowerLagTimeMs;
@@ -292,7 +292,7 @@ public class MetadataQuorumCommand {
List<Node> currentVoterList = replicas.stream().map(voter -> new Node(
voter.replicaId(),
voter.replicaDirectoryId(),
-
getEndpoints(quorumInfo.nodes().get(voter.replicaId())))).collect(Collectors.toList());
+ getEndpoints(quorumInfo.nodes().get(voter.replicaId())))).toList();
return
currentVoterList.stream().map(Objects::toString).collect(Collectors.joining(",
", "[", "]"));
}
@@ -378,7 +378,7 @@ public class MetadataQuorumCommand {
static Uuid getMetadataDirectoryId(String metadataDirectory) throws
Exception {
MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader().
- addLogDirs(Collections.singletonList(metadataDirectory)).
+ addLogDirs(List.of(metadataDirectory)).
addMetadataLogDir(metadataDirectory).
load();
MetaProperties metaProperties =
ensemble.logDirProps().get(metadataDirectory);
diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
index 4cbb3329aa1..c529e4b6820 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
@@ -42,7 +42,6 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -367,7 +366,7 @@ public class OffsetsUtils {
if (resetPlanForGroup == null) {
printError("No reset plan for group " + groupId + " found",
Optional.empty());
- return Collections.<TopicPartition,
OffsetAndMetadata>emptyMap();
+ return Map.<TopicPartition, OffsetAndMetadata>of();
}
Map<TopicPartition, Long> requestedOffsets =
resetPlanForGroup.keySet().stream().collect(Collectors.toMap(
@@ -376,7 +375,7 @@ public class OffsetsUtils {
return checkOffsetsRange(requestedOffsets).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new
OffsetAndMetadata(e.getValue())));
- }).orElseGet(Collections::emptyMap);
+ }).orElseGet(Map::of);
}
public Map<TopicPartition, OffsetAndMetadata>
resetToCurrent(Collection<TopicPartition> partitionsToReset,
Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets) {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
index 67ec1f6c50a..c5f1ddcc2d4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
@@ -127,7 +127,7 @@ public class ReplicaVerificationTool {
List<TopicDescription> filteredTopicMetadata =
topicsMetadata.stream().filter(
topicMetadata ->
options.topicsIncludeFilter().isTopicAllowed(topicMetadata.name(), false)
- ).collect(Collectors.toList());
+ ).toList();
if (filteredTopicMetadata.isEmpty()) {
LOG.error("No topics found. {} if specified, is either
filtering out all topics or there is no topic.", options.topicsIncludeOpt);
@@ -196,7 +196,7 @@ public class ReplicaVerificationTool {
counter.incrementAndGet()
);
})
- .collect(Collectors.toList());
+ .toList();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Stopping all fetchers");
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
index b04bf922d04..9e3f0ff0152 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
@@ -37,7 +37,6 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -396,7 +395,7 @@ public class ShareConsumerPerformance {
}
public Set<String> topic() {
- return Collections.singleton(options.valueOf(topicOpt));
+ return Set.of(options.valueOf(topicOpt));
}
public long numMessages() {
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
index d15d7dcde13..7047b77c180 100644
--- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -44,7 +44,6 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -178,7 +177,7 @@ public class StreamsResetter {
final StreamsResetterOptions
options)
throws ExecutionException, InterruptedException {
final DescribeConsumerGroupsResult describeResult =
adminClient.describeConsumerGroups(
- Collections.singleton(groupId),
+ Set.of(groupId),
new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
try {
final List<MemberDescription> members =
@@ -212,15 +211,15 @@ public class StreamsResetter {
final List<String> notFoundInputTopics = new ArrayList<>();
final List<String> notFoundIntermediateTopics = new ArrayList<>();
- if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
+ if (inputTopics.isEmpty() && intermediateTopics.isEmpty()) {
System.out.println("No input or intermediate topics specified.
Skipping seek.");
return EXIT_CODE_SUCCESS;
}
- if (inputTopics.size() != 0) {
+ if (!inputTopics.isEmpty()) {
System.out.println("Reset-offsets for input topics " +
inputTopics);
}
- if (intermediateTopics.size() != 0) {
+ if (!intermediateTopics.isEmpty()) {
System.out.println("Seek-to-end for intermediate topics " +
intermediateTopics);
}
@@ -313,7 +312,7 @@ public class StreamsResetter {
public void maybeSeekToEnd(final String groupId,
final Consumer<byte[], byte[]> client,
final Set<TopicPartition>
intermediateTopicPartitions) {
- if (intermediateTopicPartitions.size() > 0) {
+ if (!intermediateTopicPartitions.isEmpty()) {
System.out.println("Following intermediate topics offsets will be
reset to end (for consumer group " + groupId + ")");
for (final TopicPartition topicPartition :
intermediateTopicPartitions) {
if (allTopics.contains(topicPartition.topic())) {
@@ -328,7 +327,7 @@ public class StreamsResetter {
final Set<TopicPartition> inputTopicPartitions,
final StreamsResetterOptions options)
throws IOException, ParseException {
- if (inputTopicPartitions.size() > 0) {
+ if (!inputTopicPartitions.isEmpty()) {
System.out.println("Following input topics offsets will be reset
to (for consumer group " + options.applicationId() + ")");
if (options.hasToOffset()) {
resetOffsetsTo(client, inputTopicPartitions,
options.toOffset());
@@ -405,7 +404,7 @@ public class StreamsResetter {
if (partitionOffset.isPresent()) {
client.seek(topicPartition, partitionOffset.get());
} else {
- client.seekToEnd(Collections.singletonList(topicPartition));
+ client.seekToEnd(List.of(topicPartition));
System.out.println("Partition " + topicPartition.partition() +
" from topic " + topicPartition.topic() +
" is empty, without a committed record. Falling back
to latest known offset.");
}
@@ -508,7 +507,7 @@ public class StreamsResetter {
final List<String> topicsToDelete;
if (!specifiedInternalTopics.isEmpty()) {
- if (!inferredInternalTopics.containsAll(specifiedInternalTopics)) {
+ if (!new
HashSet<>(inferredInternalTopics).containsAll(specifiedInternalTopics)) {
throw new IllegalArgumentException("Invalid topic specified in
the "
+ "--internal-topics option. "
+ "Ensure that the topics specified are all internal
topics. "
diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
index 4da7f47e564..b4444333500 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
@@ -59,18 +59,19 @@ import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionSpec;
@@ -112,11 +113,7 @@ public abstract class TopicCommand {
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
- if (cause != null) {
- printException(cause);
- } else {
- printException(e);
- }
+ printException(Objects.requireNonNullElse(cause, e));
exitCode = 1;
} catch (Throwable e) {
printException(e);
@@ -159,10 +156,10 @@ public abstract class TopicCommand {
@SuppressWarnings("deprecation")
private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions
opts) {
- List<List<String>> configsToBeAdded =
opts.topicConfig().orElse(Collections.emptyList())
+ List<List<String>> configsToBeAdded =
opts.topicConfig().orElse(List.of())
.stream()
- .map(s -> Arrays.asList(s.split("\\s*=\\s*")))
- .collect(Collectors.toList());
+ .map(s -> List.of(s.split("\\s*=\\s*")))
+ .toList();
if (!configsToBeAdded.stream().allMatch(config -> config.size() == 2))
{
throw new IllegalArgumentException("requirement failed: Invalid
topic config: all configs to be added must be in the format \"key=val\".");
@@ -256,7 +253,7 @@ public abstract class TopicCommand {
name = options.topic().get();
partitions = options.partitions();
replicationFactor = options.replicationFactor();
- replicaAssignment =
options.replicaAssignment().orElse(Collections.emptyMap());
+ replicaAssignment = options.replicaAssignment().orElse(Map.of());
configsToAdd = parseTopicConfigsToBeAdded(options);
}
@@ -357,10 +354,10 @@ public abstract class TopicCommand {
.collect(Collectors.joining(",")));
if (reassignment != null) {
System.out.print("\tAdding Replicas: " +
reassignment.addingReplicas().stream()
- .map(node -> node.toString())
+ .map(Object::toString)
.collect(Collectors.joining(",")));
System.out.print("\tRemoving Replicas: " +
reassignment.removingReplicas().stream()
- .map(node -> node.toString())
+ .map(Object::toString)
.collect(Collectors.joining(",")));
}
@@ -443,9 +440,7 @@ public abstract class TopicCommand {
}
private static Admin createAdminClient(Properties commandConfig,
Optional<String> bootstrapServer) {
- if (bootstrapServer.isPresent()) {
- commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer.get());
- }
+ bootstrapServer.ifPresent(s ->
commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s));
return Admin.create(commandConfig);
}
@@ -475,10 +470,10 @@ public abstract class TopicCommand {
}
Map<String, String> configsMap =
topic.configsToAdd.stringPropertyNames().stream()
- .collect(Collectors.toMap(name -> name, name ->
topic.configsToAdd.getProperty(name)));
+ .collect(Collectors.toMap(name -> name,
topic.configsToAdd::getProperty));
newTopic.configs(configsMap);
- CreateTopicsResult createResult =
adminClient.createTopics(Collections.singleton(newTopic),
+ CreateTopicsResult createResult =
adminClient.createTopics(Set.of(newTopic),
new CreateTopicsOptions().retryOnQuotaViolation(false));
createResult.all().get();
System.out.println("Created topic " + topic.name + ".");
@@ -493,9 +488,7 @@ public abstract class TopicCommand {
}
public void listTopics(TopicCommandOptions opts) throws
ExecutionException, InterruptedException {
- String results = getTopics(opts.topic(),
opts.excludeInternalTopics())
- .stream()
- .collect(Collectors.joining("\n"));
+ String results = String.join("\n", getTopics(opts.topic(),
opts.excludeInternalTopics()));
System.out.println(results);
}
@@ -539,7 +532,7 @@ public abstract class TopicCommand {
Throwable cause = e.getCause();
if (cause instanceof UnsupportedVersionException || cause
instanceof ClusterAuthorizationException) {
LOG.debug("Couldn't query reassignments through the
AdminClient API: " + cause.getMessage(), cause);
- return Collections.emptyMap();
+ return Map.of();
} else {
throw new RuntimeException(e);
}
@@ -558,9 +551,9 @@ public abstract class TopicCommand {
List<String> topics;
if (useTopicId) {
topicIds = getTopicIds(inputTopicId.get(),
opts.excludeInternalTopics());
- topics = Collections.emptyList();
+ topics = List.of();
} else {
- topicIds = Collections.emptyList();
+ topicIds = List.of();
topics = getTopics(opts.topic(), opts.excludeInternalTopics());
}
@@ -588,7 +581,7 @@ public abstract class TopicCommand {
List<String> topicNames = topicDescriptions.stream()
.map(org.apache.kafka.clients.admin.TopicDescription::name)
- .collect(Collectors.toList());
+ .toList();
Map<ConfigResource, KafkaFuture<Config>> allConfigs =
adminClient.describeConfigs(
topicNames.stream()
.map(name -> new ConfigResource(ConfigResource.Type.TOPIC,
name))
@@ -596,7 +589,7 @@ public abstract class TopicCommand {
).values();
List<Integer> liveBrokers =
adminClient.describeCluster().nodes().get().stream()
.map(Node::id)
- .collect(Collectors.toList());
+ .toList();
DescribeOptions describeOptions = new DescribeOptions(opts, new
HashSet<>(liveBrokers));
Set<TopicPartition> topicPartitions = topicDescriptions
.stream()
@@ -647,7 +640,7 @@ public abstract class TopicCommand {
public void deleteTopic(TopicCommandOptions opts) throws
ExecutionException, InterruptedException {
List<String> topics = getTopics(opts.topic(),
opts.excludeInternalTopics());
ensureTopicExists(topics, opts.topic(), !opts.ifExists());
- adminClient.deleteTopics(Collections.unmodifiableList(topics),
+ adminClient.deleteTopics(List.copyOf(topics),
new DeleteTopicsOptions().retryOnQuotaViolation(false)
).all().get();
}
@@ -668,10 +661,10 @@ public abstract class TopicCommand {
List<Uuid> allTopicIds = allTopics.listings().get().stream()
.map(TopicListing::topicId)
.sorted()
- .collect(Collectors.toList());
+ .toList();
return allTopicIds.contains(topicIdIncludeList) ?
- Collections.singletonList(topicIdIncludeList) :
- Collections.emptyList();
+ List.of(topicIdIncludeList) :
+ List.of();
}
@Override
@@ -835,7 +828,7 @@ public abstract class TopicCommand {
}
public <A> Optional<List<A>> valuesAsOption(OptionSpec<A> option) {
- return valuesAsOption(option, Collections.emptyList());
+ return valuesAsOption(option, List.of());
}
public <A> Optional<A> valueAsOption(OptionSpec<A> option, Optional<A>
defaultValue) {
@@ -953,8 +946,7 @@ public abstract class TopicCommand {
// should have exactly one action
long actions =
- Arrays.asList(createOpt, listOpt, alterOpt, describeOpt,
deleteOpt)
- .stream().filter(options::has)
+ Stream.of(createOpt, listOpt, alterOpt, describeOpt,
deleteOpt).filter(options::has)
.count();
if (actions != 1)
CommandLineUtils.printUsageAndExit(parser, "Command must
include exactly one action: --list, --describe, --create, --alter or --delete");
@@ -989,29 +981,29 @@ public abstract class TopicCommand {
private void checkInvalidArgs() {
// check invalid args
- CommandLineUtils.checkInvalidArgs(parser, options, configOpt,
invalidOptions(Arrays.asList(alterOpt, createOpt)));
- CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt,
invalidOptions(Arrays.asList(alterOpt, createOpt)));
- CommandLineUtils.checkInvalidArgs(parser, options,
replicationFactorOpt, invalidOptions(Arrays.asList(createOpt)));
- CommandLineUtils.checkInvalidArgs(parser, options,
replicaAssignmentOpt, invalidOptions(Arrays.asList(alterOpt, createOpt)));
+ CommandLineUtils.checkInvalidArgs(parser, options, configOpt,
invalidOptions(List.of(alterOpt, createOpt)));
+ CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt,
invalidOptions(List.of(alterOpt, createOpt)));
+ CommandLineUtils.checkInvalidArgs(parser, options,
replicationFactorOpt, invalidOptions(List.of(createOpt)));
+ CommandLineUtils.checkInvalidArgs(parser, options,
replicaAssignmentOpt, invalidOptions(List.of(alterOpt, createOpt)));
if (options.has(createOpt)) {
CommandLineUtils.checkInvalidArgs(parser, options,
replicaAssignmentOpt, partitionsOpt, replicationFactorOpt);
}
CommandLineUtils.checkInvalidArgs(parser, options,
reportUnderReplicatedPartitionsOpt,
- invalidOptions(Collections.singleton(topicsWithOverridesOpt),
Arrays.asList(describeOpt, reportUnderReplicatedPartitionsOpt)));
+ invalidOptions(Set.of(topicsWithOverridesOpt),
List.of(describeOpt, reportUnderReplicatedPartitionsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options,
reportUnderMinIsrPartitionsOpt,
- invalidOptions(Collections.singleton(topicsWithOverridesOpt),
Arrays.asList(describeOpt, reportUnderMinIsrPartitionsOpt)));
+ invalidOptions(Set.of(topicsWithOverridesOpt),
List.of(describeOpt, reportUnderMinIsrPartitionsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options,
reportAtMinIsrPartitionsOpt,
- invalidOptions(Collections.singleton(topicsWithOverridesOpt),
Arrays.asList(describeOpt, reportAtMinIsrPartitionsOpt)));
+ invalidOptions(Set.of(topicsWithOverridesOpt),
List.of(describeOpt, reportAtMinIsrPartitionsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options,
reportUnavailablePartitionsOpt,
- invalidOptions(Collections.singleton(topicsWithOverridesOpt),
Arrays.asList(describeOpt, reportUnavailablePartitionsOpt)));
+ invalidOptions(Set.of(topicsWithOverridesOpt),
List.of(describeOpt, reportUnavailablePartitionsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options,
topicsWithOverridesOpt,
- invalidOptions(new HashSet<>(allReplicationReportOpts),
Arrays.asList(describeOpt)));
+ invalidOptions(new HashSet<>(allReplicationReportOpts),
List.of(describeOpt)));
CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt,
- invalidOptions(Arrays.asList(alterOpt, deleteOpt,
describeOpt)));
- CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt,
invalidOptions(Arrays.asList(createOpt)));
- CommandLineUtils.checkInvalidArgs(parser, options,
excludeInternalTopicOpt, invalidOptions(Arrays.asList(listOpt, describeOpt)));
+ invalidOptions(List.of(alterOpt, deleteOpt, describeOpt)));
+ CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt,
invalidOptions(List.of(createOpt)));
+ CommandLineUtils.checkInvalidArgs(parser, options,
excludeInternalTopicOpt, invalidOptions(List.of(listOpt, describeOpt)));
}
private Set<OptionSpec<?>> invalidOptions(List<OptionSpec<?>>
removeOptions) {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 7a79f51046c..2930d490f4f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -47,17 +47,16 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import static java.util.Collections.singleton;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@@ -240,13 +239,13 @@ public class TransactionalMessageCopier {
if (offsetAndMetadata != null)
consumer.seek(tp, offsetAndMetadata.offset());
else
- consumer.seekToBeginning(singleton(tp));
+ consumer.seekToBeginning(Set.of(tp));
});
}
private static long messagesRemaining(KafkaConsumer<String, String>
consumer, TopicPartition partition) {
long currentPosition = consumer.position(partition);
- Map<TopicPartition, Long> endOffsets =
consumer.endOffsets(singleton(partition));
+ Map<TopicPartition, Long> endOffsets =
consumer.endOffsets(Set.of(partition));
if (endOffsets.containsKey(partition)) {
return endOffsets.get(partition) - currentPosition;
}
@@ -319,7 +318,7 @@ public class TransactionalMessageCopier {
final AtomicLong numMessagesProcessedSinceLastRebalance = new
AtomicLong(0);
final AtomicLong totalMessageProcessed = new AtomicLong(0);
if (groupMode) {
- consumer.subscribe(Collections.singleton(topicName), new
ConsumerRebalanceListener() {
+ consumer.subscribe(Set.of(topicName), new
ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
}
@@ -341,7 +340,7 @@ public class TransactionalMessageCopier {
});
} else {
TopicPartition inputPartition = new TopicPartition(topicName,
parsedArgs.getInt("inputPartition"));
- consumer.assign(singleton(inputPartition));
+ consumer.assign(Set.of(inputPartition));
remainingMessages.set(Math.min(messagesRemaining(consumer,
inputPartition), remainingMessages.get()));
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index c1dbefcef60..9c5323104fe 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -50,7 +50,6 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -65,8 +64,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
import static net.sourceforge.argparse4j.impl.Arguments.store;
public abstract class TransactionsCommand {
@@ -159,7 +156,7 @@ public abstract class TransactionsCommand {
) throws Exception {
final DescribeProducersResult.PartitionProducerState result;
try {
- result = admin.describeProducers(singleton(topicPartition))
+ result = admin.describeProducers(Set.of(topicPartition))
.partitionResult(topicPartition)
.get();
} catch (ExecutionException e) {
@@ -345,7 +342,7 @@ public abstract class TransactionsCommand {
final DescribeProducersResult.PartitionProducerState result;
try {
- result = admin.describeProducers(singleton(topicPartition),
options)
+ result = admin.describeProducers(Set.of(topicPartition),
options)
.partitionResult(topicPartition)
.get();
} catch (ExecutionException e) {
@@ -418,7 +415,7 @@ public abstract class TransactionsCommand {
final TransactionDescription result;
try {
- result = admin.describeTransactions(singleton(transactionalId))
+ result = admin.describeTransactions(Set.of(transactionalId))
.description(transactionalId)
.get();
} catch (ExecutionException e) {
@@ -451,7 +448,7 @@ public abstract class TransactionsCommand {
result.topicPartitions().stream().map(TopicPartition::toString).collect(Collectors.joining(","))
);
- ToolsUtils.prettyPrintTable(HEADERS, singletonList(row), out);
+ ToolsUtils.prettyPrintTable(HEADERS, List.of(row), out);
}
}
@@ -615,7 +612,7 @@ public abstract class TransactionsCommand {
);
if (candidates.isEmpty()) {
- printHangingTransactions(Collections.emptyList(), out);
+ printHangingTransactions(List.of(), out);
} else {
Map<Long, List<OpenTransaction>> openTransactionsByProducerId
= groupByProducerId(candidates);
@@ -649,9 +646,9 @@ public abstract class TransactionsCommand {
if (topic.isPresent()) {
if (partition.isPresent()) {
- return Collections.singletonList(new
TopicPartition(topic.get(), partition.get()));
+ return List.of(new TopicPartition(topic.get(),
partition.get()));
} else {
- topics = Collections.singletonList(topic.get());
+ topics = List.of(topic.get());
}
} else {
topics = listTopics(admin);
@@ -752,7 +749,7 @@ public abstract class TransactionsCommand {
} catch (ExecutionException e) {
printErrorAndExit("Failed to describe " +
transactionalIds.size()
+ " transactions", e.getCause());
- return Collections.emptyMap();
+ return Map.of();
}
}
@@ -778,7 +775,7 @@ public abstract class TransactionsCommand {
return new
ArrayList<>(admin.listTopics(listOptions).names().get());
} catch (ExecutionException e) {
printErrorAndExit("Failed to list topics", e.getCause());
- return Collections.emptyList();
+ return List.of();
}
}
@@ -788,14 +785,14 @@ public abstract class TransactionsCommand {
List<String> topics
) throws Exception {
List<TopicPartition> topicPartitions = new ArrayList<>();
- consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+ consumeInBatches(topics, MAX_BATCH_SIZE, batch ->
findTopicPartitions(
admin,
brokerId,
batch,
topicPartitions
- );
- });
+ )
+ );
return topicPartitions;
}
@@ -807,13 +804,13 @@ public abstract class TransactionsCommand {
) throws Exception {
try {
Map<String, TopicDescription> topicDescriptions =
admin.describeTopics(topics).allTopicNames().get();
- topicDescriptions.forEach((topic, description) -> {
+ topicDescriptions.forEach((topic, description) ->
description.partitions().forEach(partitionInfo -> {
if (brokerId.isEmpty() || hasReplica(brokerId.get(),
partitionInfo)) {
topicPartitions.add(new TopicPartition(topic,
partitionInfo.partition()));
}
- });
- });
+ })
+ );
} catch (ExecutionException e) {
printErrorAndExit("Failed to describe " + topics.size() + "
topics", e.getCause());
}
@@ -838,15 +835,15 @@ public abstract class TransactionsCommand {
List<OpenTransaction> candidateTransactions = new ArrayList<>();
- consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+ consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch ->
collectCandidateOpenTransactions(
admin,
brokerId,
maxTransactionTimeoutMs,
batch,
candidateTransactions
- );
- });
+ )
+ );
return candidateTransactions;
}
@@ -880,7 +877,7 @@ public abstract class TransactionsCommand {
long currentTimeMs = time.milliseconds();
- producersByPartition.forEach((topicPartition, producersStates)
-> {
+ producersByPartition.forEach((topicPartition, producersStates)
->
producersStates.activeProducers().forEach(activeProducer
-> {
if
(activeProducer.currentTransactionStartOffset().isPresent()) {
long transactionDurationMs = currentTimeMs -
activeProducer.lastTimestamp();
@@ -891,8 +888,8 @@ public abstract class TransactionsCommand {
));
}
}
- });
- });
+ })
+ );
} catch (ExecutionException e) {
printErrorAndExit("Failed to describe producers for " +
topicPartitions.size() +
" partitions on broker " + brokerId, e.getCause());
@@ -928,7 +925,7 @@ public abstract class TransactionsCommand {
} catch (ExecutionException e) {
printErrorAndExit("Failed to list transactions for " +
producerIds.size() +
" producers", e.getCause());
- return Collections.emptyMap();
+ return Map.of();
}
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 0da56f1340b..1fa1aed8c4f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -58,7 +58,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -231,7 +230,7 @@ public class VerifiableConsumer implements Closeable,
OffsetCommitCallback, Cons
public void run() {
try {
printJson(new StartupComplete());
- consumer.subscribe(Collections.singletonList(topic), this);
+ consumer.subscribe(List.of(topic), this);
while (!isFinished()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
@@ -623,7 +622,7 @@ public class VerifiableConsumer implements Closeable,
OffsetCommitCallback, Cons
boolean useAutoCommit = res.getBoolean("useAutoCommit");
String configFile = res.getString("consumer.config");
- String brokerHostandPort = res.getString("bootstrapServer");
+ String brokerHostAndPort = res.getString("bootstrapServer");
Properties consumerProps = new Properties();
if (configFile != null) {
@@ -664,7 +663,7 @@ public class VerifiableConsumer implements Closeable,
OffsetCommitCallback, Cons
consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
groupInstanceId);
}
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerHostandPort);
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerHostAndPort);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
useAutoCommit);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
res.getString("resetPolicy"));
diff --git
a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
index 496c41412cc..d83c1e44244 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
@@ -61,7 +61,6 @@ import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@@ -421,7 +420,7 @@ public class VerifiableShareConsumer implements Closeable,
AcknowledgementCommit
printJson(new
OffsetResetStrategySet(offsetResetStrategy.type().toString()));
}
- consumer.subscribe(Collections.singleton(this.topic));
+ consumer.subscribe(Set.of(this.topic));
consumer.setAcknowledgementCommitCallback(this);
while (!(maxMessages >= 0 && totalAcknowledged >= maxMessages)) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(5000));
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
index f7c1402f25e..74a928b9727 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
@@ -37,6 +37,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
@@ -160,7 +161,7 @@ public class ConsoleConsumer {
if (opts.partitionArg().isPresent()) {
seek(topic.get(), opts.partitionArg().getAsInt(),
opts.offsetArg());
} else {
- consumer.subscribe(Collections.singletonList(topic.get()));
+ consumer.subscribe(List.of(topic.get()));
}
} else {
opts.includedTopicsArg().ifPresent(topics ->
consumer.subscribe(Pattern.compile(topics)));
@@ -169,11 +170,11 @@ public class ConsoleConsumer {
private void seek(String topic, int partitionId, long offset) {
TopicPartition topicPartition = new TopicPartition(topic,
partitionId);
- consumer.assign(Collections.singletonList(topicPartition));
+ consumer.assign(List.of(topicPartition));
if (offset == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
-
consumer.seekToBeginning(Collections.singletonList(topicPartition));
+ consumer.seekToBeginning(List.of(topicPartition));
} else if (offset == ListOffsetsRequest.LATEST_TIMESTAMP) {
- consumer.seekToEnd(Collections.singletonList(topicPartition));
+ consumer.seekToEnd(List.of(topicPartition));
} else {
consumer.seek(topicPartition, offset);
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
index 5c6d26f433e..cf4daa0c636 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
@@ -26,7 +26,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -186,9 +185,9 @@ public final class ConsoleConsumerOptions extends
CommandDefaultOptions {
}
private void checkRequiredArgs() {
- List<Optional<String>> topicOrFilterArgs = new
ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
- topicOrFilterArgs.removeIf(arg -> arg.isEmpty());
- // user need to specify value for either --topic or --include options)
+ List<Optional<String>> topicOrFilterArgs = new
ArrayList<>(List.of(topicArg(), includedTopicsArg()));
+ topicOrFilterArgs.removeIf(Optional::isEmpty);
+ // user need to specify value for either --topic or --include options
if (topicOrFilterArgs.size() != 1) {
CommandLineUtils.printUsageAndExit(parser, "Exactly one of
--include/--topic is required. ");
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
index 904f2333f2f..2505266abf7 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
@@ -36,6 +36,7 @@ import java.io.PrintStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -166,7 +167,7 @@ public class ConsoleShareConsumer {
this.consumer = consumer;
this.timeoutMs = timeoutMs;
- consumer.subscribe(Collections.singletonList(topic));
+ consumer.subscribe(List.of(topic));
}
ConsumerRecord<byte[], byte[]> receive() {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 6a0d1ffd224..2cfffb9fe78 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -58,7 +58,6 @@ import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -121,7 +120,7 @@ public class ConsumerGroupCommand {
return;
}
- try (ConsumerGroupService consumerGroupService = new
ConsumerGroupService(opts, Collections.emptyMap())) {
+ try (ConsumerGroupService consumerGroupService = new
ConsumerGroupService(opts, Map.of())) {
if (opts.options.has(opts.listOpt))
consumerGroupService.listGroups();
else if (opts.options.has(opts.describeOpt))
@@ -242,14 +241,14 @@ public class ConsumerGroupCommand {
private Set<GroupState> stateValues() {
String stateValue = opts.options.valueOf(opts.stateOpt);
return (stateValue == null || stateValue.isEmpty())
- ? Collections.emptySet()
+ ? Set.of()
: groupStatesFromString(stateValue);
}
private Set<GroupType> typeValues() {
String typeValue = opts.options.valueOf(opts.typeOpt);
return (typeValue == null || typeValue.isEmpty())
- ? Collections.emptySet()
+ ? Set.of()
: consumerGroupTypesFromString(typeValue);
}
@@ -585,7 +584,7 @@ public class ConsumerGroupCommand {
Optional<String> clientIdOpt
) {
if (topicPartitions.isEmpty()) {
- return Collections.singleton(
+ return Set.of(
new PartitionAssignmentState(group, coordinator,
Optional.empty(), Optional.empty(), Optional.empty(),
getLag(Optional.empty(), Optional.empty()),
consumerIdOpt, hostOpt, clientIdOpt, Optional.empty(), Optional.empty())
);
@@ -684,7 +683,7 @@ public class ConsumerGroupCommand {
break;
default:
printError("Assignments can only be reset if the
group '" + groupId + "' is inactive, but the current state is " + state + ".",
Optional.empty());
- result.put(groupId, Collections.emptyMap());
+ result.put(groupId, Map.of());
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
@@ -855,7 +854,7 @@ public class ConsumerGroupCommand {
* Returns the state of the specified consumer group and partition
assignment states
*/
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String
groupId) throws Exception {
- return
collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId,
new SimpleImmutableEntry<>(Optional.empty(), Optional.empty()));
+ return
collectGroupsOffsets(List.of(groupId)).getOrDefault(groupId, new
SimpleImmutableEntry<>(Optional.empty(), Optional.empty()));
}
/**
@@ -899,7 +898,7 @@ public class ConsumerGroupCommand {
Optional.of(MISSING_COLUMN_VALUE),
Optional.of(MISSING_COLUMN_VALUE),
Optional.of(MISSING_COLUMN_VALUE))
- : Collections.emptyList();
+ : List.of();
rowsWithConsumer.addAll(rowsWithoutConsumer);
@@ -910,7 +909,7 @@ public class ConsumerGroupCommand {
}
Entry<Optional<GroupState>,
Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String
groupId) throws Exception {
- return
collectGroupsMembers(Collections.singleton(groupId)).get(groupId);
+ return collectGroupsMembers(Set.of(groupId)).get(groupId);
}
TreeMap<String, Entry<Optional<GroupState>,
Optional<Collection<MemberAssignmentState>>>>
collectGroupsMembers(Collection<String> groupIds) throws Exception {
@@ -928,7 +927,7 @@ public class ConsumerGroupCommand {
consumer.groupInstanceId().orElse(""),
consumer.assignment().topicPartitions().size(),
consumer.assignment().topicPartitions().stream().toList(),
- consumer.targetAssignment().map(a ->
a.topicPartitions().stream().toList()).orElse(Collections.emptyList()),
+ consumer.targetAssignment().map(a ->
a.topicPartitions().stream().toList()).orElse(List.of()),
consumer.memberEpoch(),
consumerGroup.targetAssignmentEpoch(),
consumer.upgraded()
@@ -939,7 +938,7 @@ public class ConsumerGroupCommand {
}
GroupInformation collectGroupState(String groupId) throws Exception {
- return
collectGroupsState(Collections.singleton(groupId)).get(groupId);
+ return collectGroupsState(Set.of(groupId)).get(groupId);
}
TreeMap<String, GroupInformation>
collectGroupsState(Collection<String> groupIds) throws Exception {
@@ -986,14 +985,14 @@ public class ConsumerGroupCommand {
if (!opts.options.has(opts.resetFromFileOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "One of
the reset scopes should be defined: --all-topics, --topic.");
- return Collections.emptyList();
+ return List.of();
}
}
private Map<TopicPartition, OffsetAndMetadata>
getCommittedOffsets(String groupId) {
try {
return adminClient.listConsumerGroupOffsets(
- Collections.singletonMap(groupId, new
ListConsumerGroupOffsetsSpec()),
+ Map.of(groupId, new ListConsumerGroupOffsetsSpec()),
withTimeoutMs(new ListConsumerGroupOffsetsOptions())
).partitionsToOffsetAndMetadata(groupId).get();
} catch (InterruptedException | ExecutionException e) {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
index 553ea3acb8a..30ba137cd8d 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
@@ -22,8 +22,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -203,11 +201,11 @@ public class ConsumerGroupCommandOptions extends
CommandDefaultOptions {
.describedAs("regex")
.ofType(String.class);
- allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt,
allGroupsOpt));
- allConsumerGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt,
describeOpt, deleteOpt, resetOffsetsOpt));
- allResetOffsetScenarioOpts = new
HashSet<>(Arrays.asList(resetToOffsetOpt, resetShiftByOpt,
- resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt,
resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt));
- allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt,
topicOpt));
+ allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
+ allConsumerGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt,
resetOffsetsOpt);
+ allResetOffsetScenarioOpts = Set.of(resetToOffsetOpt, resetShiftByOpt,
+ resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt,
resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt);
+ allDeleteOffsetsOpts = Set.of(groupOpt, topicOpt);
options = parser.parse(args);
}
@@ -224,7 +222,7 @@ public class ConsumerGroupCommandOptions extends
CommandDefaultOptions {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes one of these options: " +
allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
- List<OptionSpec<?>> mutuallyExclusiveOpts =
Arrays.asList(membersOpt, offsetsOpt, stateOpt);
+ List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt,
offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ?
1 : 0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes at most one of these
options: " +
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 0e5fc6167ee..8996f92b84e 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -387,7 +387,7 @@ public class ShareGroupCommand {
TreeMap<String, ShareGroupDescription>
collectGroupsDescription(Collection<String> groupIds) throws
ExecutionException, InterruptedException {
Map<String, ShareGroupDescription> shareGroups =
describeShareGroups(groupIds);
TreeMap<String, ShareGroupDescription> res = new TreeMap<>();
- shareGroups.forEach(res::put);
+ res.putAll(shareGroups);
return res;
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index be99d2946a7..9a96cad00ed 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -22,8 +22,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -144,10 +142,10 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
.availableIf(describeOpt);
- allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt,
allGroupsOpt));
- allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt,
describeOpt, deleteOpt, resetOffsetsOpt));
- allResetOffsetScenarioOpts = new
HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt,
resetToLatestOpt));
- allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt,
topicOpt));
+ allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
+ allShareGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt,
resetOffsetsOpt);
+ allResetOffsetScenarioOpts = Set.of(resetToDatetimeOpt,
resetToEarliestOpt, resetToLatestOpt);
+ allDeleteOffsetsOpts = Set.of(groupOpt, topicOpt);
options = parser.parse(args);
}
@@ -162,7 +160,7 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes one of these options: "
+
allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
- List<OptionSpec<?>> mutuallyExclusiveOpts =
Arrays.asList(membersOpt, offsetsOpt, stateOpt);
+ List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt,
offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ?
1 : 0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes at most one of these
options: " +
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 329b202b4a8..334f0738ca3 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -226,7 +226,7 @@ public class ReassignPartitionsCommand {
if (!partsOngoing && !movesOngoing && !preserveThrottles) {
// If the partition assignments and replica assignments are done,
clear any throttles
// that were set. We have to clear all throttles, because we
don't have enough
- // information to know all of the source brokers that might have
been involved in the
+ // information to know all the source brokers that might have been
involved in the
// previous reassignments.
clearAllThrottles(adminClient, targetParts);
}
@@ -849,7 +849,7 @@ public class ReassignPartitionsCommand {
.map(Object::toString)
.collect(Collectors.joining(","))));
} else {
- // If a replica has been moved to a new host and we also
specified a particular
+ // If a replica has been moved to a new host, and we also
specified a particular
// log directory, we will have to keep retrying the
alterReplicaLogDirs
// call. It can't take effect until the replica is moved to
that host.
time.sleep(100);
@@ -1342,21 +1342,18 @@ public class ReassignPartitionsCommand {
}
private static List<String> parseTopicsData(int version, JsonValue js)
throws JsonMappingException {
- switch (version) {
- case 1:
- List<String> results = new ArrayList<>();
- Optional<JsonValue> partitionsSeq =
js.asJsonObject().get("topics");
- if (partitionsSeq.isPresent()) {
- Iterator<JsonValue> iter =
partitionsSeq.get().asJsonArray().iterator();
- while (iter.hasNext()) {
-
results.add(iter.next().asJsonObject().apply("topic").to(STRING));
- }
+ if (version == 1) {
+ List<String> results = new ArrayList<>();
+ Optional<JsonValue> partitionsSeq =
js.asJsonObject().get("topics");
+ if (partitionsSeq.isPresent()) {
+ Iterator<JsonValue> iter =
partitionsSeq.get().asJsonArray().iterator();
+ while (iter.hasNext()) {
+
results.add(iter.next().asJsonObject().apply("topic").to(STRING));
}
- return results;
-
- default:
- throw new AdminOperationException("Not supported version field
value " + version);
+ }
+ return results;
}
+ throw new AdminOperationException("Not supported version field value "
+ version);
}
private static Entry<List<Entry<TopicPartition, List<Integer>>>,
Map<TopicPartitionReplica, String>> parsePartitionReassignmentData(
@@ -1376,46 +1373,43 @@ public class ReassignPartitionsCommand {
private static Entry<List<Entry<TopicPartition, List<Integer>>>,
Map<TopicPartitionReplica, String>> parsePartitionReassignmentData(
int version, JsonValue jsonData
) throws JsonMappingException {
- switch (version) {
- case 1:
- List<Entry<TopicPartition, List<Integer>>> partitionAssignment
= new ArrayList<>();
- Map<TopicPartitionReplica, String> replicaAssignment = new
HashMap<>();
-
- Optional<JsonValue> partitionsSeq =
jsonData.asJsonObject().get("partitions");
- if (partitionsSeq.isPresent()) {
- Iterator<JsonValue> iter =
partitionsSeq.get().asJsonArray().iterator();
- while (iter.hasNext()) {
- JsonObject partitionFields =
iter.next().asJsonObject();
- String topic =
partitionFields.apply("topic").to(STRING);
- int partition =
partitionFields.apply("partition").to(INT);
- List<Integer> newReplicas =
partitionFields.apply("replicas").to(INT_LIST);
- Optional<JsonValue> logDirsOpts =
partitionFields.get("log_dirs");
- List<String> newLogDirs;
- if (logDirsOpts.isPresent())
- newLogDirs = logDirsOpts.get().to(STRING_LIST);
- else
- newLogDirs = newReplicas.stream().map(r ->
ANY_LOG_DIR).collect(Collectors.toList());
- if (newReplicas.size() != newLogDirs.size())
- throw new AdminCommandFailedException("Size of
replicas list " + newReplicas + " is different from " +
- "size of log dirs list " + newLogDirs + " for
partition " + new TopicPartition(topic, partition));
- partitionAssignment.add(Map.entry(new
TopicPartition(topic, partition), newReplicas));
- for (int i = 0; i < newLogDirs.size(); i++) {
- Integer replica = newReplicas.get(i);
- String logDir = newLogDirs.get(i);
-
- if (logDir.equals(ANY_LOG_DIR))
- continue;
-
- replicaAssignment.put(new
TopicPartitionReplica(topic, partition, replica), logDir);
- }
+ if (version == 1) {
+ List<Entry<TopicPartition, List<Integer>>> partitionAssignment =
new ArrayList<>();
+ Map<TopicPartitionReplica, String> replicaAssignment = new
HashMap<>();
+
+ Optional<JsonValue> partitionsSeq =
jsonData.asJsonObject().get("partitions");
+ if (partitionsSeq.isPresent()) {
+ Iterator<JsonValue> iter =
partitionsSeq.get().asJsonArray().iterator();
+ while (iter.hasNext()) {
+ JsonObject partitionFields = iter.next().asJsonObject();
+ String topic = partitionFields.apply("topic").to(STRING);
+ int partition = partitionFields.apply("partition").to(INT);
+ List<Integer> newReplicas =
partitionFields.apply("replicas").to(INT_LIST);
+ Optional<JsonValue> logDirsOpts =
partitionFields.get("log_dirs");
+ List<String> newLogDirs;
+ if (logDirsOpts.isPresent())
+ newLogDirs = logDirsOpts.get().to(STRING_LIST);
+ else
+ newLogDirs = newReplicas.stream().map(r ->
ANY_LOG_DIR).collect(Collectors.toList());
+ if (newReplicas.size() != newLogDirs.size())
+ throw new AdminCommandFailedException("Size of
replicas list " + newReplicas + " is different from " +
+ "size of log dirs list " + newLogDirs + " for
partition " + new TopicPartition(topic, partition));
+ partitionAssignment.add(Map.entry(new
TopicPartition(topic, partition), newReplicas));
+ for (int i = 0; i < newLogDirs.size(); i++) {
+ Integer replica = newReplicas.get(i);
+ String logDir = newLogDirs.get(i);
+
+ if (logDir.equals(ANY_LOG_DIR))
+ continue;
+
+ replicaAssignment.put(new TopicPartitionReplica(topic,
partition, replica), logDir);
}
}
+ }
- return Map.entry(partitionAssignment, replicaAssignment);
-
- default:
- throw new AdminOperationException("Not supported version field
value " + version);
+ return Map.entry(partitionAssignment, replicaAssignment);
}
+ throw new AdminOperationException("Not supported version field value "
+ version);
}
static ReassignPartitionsCommandOptions validateAndParseArgs(String[]
args) {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
index 4812f1f19e9..9dbde0fd1af 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
@@ -20,7 +20,6 @@ package org.apache.kafka.tools.reassign;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
-import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -34,7 +33,7 @@ public final class VerifyAssignmentResult {
public final boolean movesOngoing;
public VerifyAssignmentResult(Map<TopicPartition,
PartitionReassignmentState> partStates) {
- this(partStates, false, Collections.emptyMap(), false);
+ this(partStates, false, Map.of(), false);
}
/**
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 7ac61e9df9e..0f68bf82900 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -61,7 +61,6 @@ import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -457,7 +456,7 @@ public class StreamsGroupCommand {
boolean allPresent =
topicPartitions.stream().allMatch(allTopicPartitions::containsKey);
if (!allPresent) {
printError("One or more topics are not part of the group
'" + groupId + "'.", Optional.empty());
- return Collections.emptyList();
+ return List.of();
}
return topicPartitions;
} catch (InterruptedException | ExecutionException e) {
@@ -508,7 +507,7 @@ public class StreamsGroupCommand {
break;
default:
printError("Assignments can only be reset if
the group '" + groupId + "' is inactive, but the current state is " + state +
".", Optional.empty());
- result.put(groupId, Collections.emptyMap());
+ result.put(groupId, Map.of());
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
@@ -889,7 +888,7 @@ public class StreamsGroupCommand {
if (!opts.options.has(opts.resetFromFileOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "One of
the reset scopes should be defined: --all-topics, --topic.");
- return Collections.emptyList();
+ return List.of();
}
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index e74104d27ff..6ce387d3dbd 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -22,8 +22,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -200,12 +198,12 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
exportOpt = parser.accepts("export", EXPORT_DOC);
options = parser.parse(args);
- allResetOffsetScenarioOpts = new
HashSet<>(Arrays.asList(resetToOffsetOpt, resetShiftByOpt,
- resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt,
resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt));
- allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt,
allGroupsOpt));
- allStreamsGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt,
describeOpt, deleteOpt));
- allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(inputTopicOpt,
allInputTopicsOpt));
- allDeleteInternalGroupsOpts = new
HashSet<>(Arrays.asList(resetOffsetsOpt, deleteOpt));
+ allResetOffsetScenarioOpts = Set.of(resetToOffsetOpt, resetShiftByOpt,
+ resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt,
resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt);
+ allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
+ allStreamsGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt);
+ allDeleteOffsetsOpts = Set.of(inputTopicOpt, allInputTopicsOpt);
+ allDeleteInternalGroupsOpts = Set.of(resetOffsetsOpt, deleteOpt);
}
@SuppressWarnings("NPathComplexity")
@@ -256,7 +254,7 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes one of these options: " +
allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
- List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt,
offsetsOpt, stateOpt);
+ List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt,
offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 :
0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes at most one of these
options: " +
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));