This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 398d2ab MINOR: KafkaAdminClient Java 8 code cleanup (#5594) 398d2ab is described below commit 398d2ab244a40e7f975faa1ed60c0e0b14cf4674 Author: Viktor Somogyi <viktorsomo...@gmail.com> AuthorDate: Thu Sep 13 06:13:03 2018 +0200 MINOR: KafkaAdminClient Java 8 code cleanup (#5594) Use lambdas and diamond operator whenever possible. Reviewers: Ismael Juma <ism...@juma.me.uk> --- .../kafka/clients/admin/KafkaAdminClient.java | 73 ++++++++++------------ 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 5759d63..7abe7ef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -379,7 +379,7 @@ public class KafkaAdminClient extends AdminClient { String clientId = generateClientId(config); try { - metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time); + metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time); LogContext logContext = createLogContext(clientId); AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), @@ -897,7 +897,7 @@ public class KafkaAdminClient extends AdminClient { } Call call = calls.remove(0); int timeoutMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs); - AbstractRequest.Builder<?> requestBuilder = null; + AbstractRequest.Builder<?> requestBuilder; try { requestBuilder = call.createRequest(timeoutMs); } catch (Throwable throwable) { @@ -1201,7 +1201,7 @@ public class KafkaAdminClient extends AdminClient { // Since this only requests node information, it's safe to pass true // for allowAutoTopicCreation (and it simplifies communication with // older brokers) - return new MetadataRequest.Builder(Collections.<String>emptyList(), true); + return new MetadataRequest.Builder(Collections.emptyList(), true); } @Override @@ -1248,7 +1248,7 @@ public class KafkaAdminClient extends AdminClient { newTopic.name() + "' cannot be represented in a request.")); topicFutures.put(newTopic.name(), future); } else if (!topicFutures.containsKey(newTopic.name())) { - topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>()); + topicFutures.put(newTopic.name(), new KafkaFutureImpl<>()); topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails()); } } @@ -1304,7 +1304,7 @@ public class KafkaAdminClient extends AdminClient { if (!topicsMap.isEmpty()) { runnable.call(call, now); } - return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures)); + return new CreateTopicsResult(new HashMap<>(topicFutures)); } @Override @@ -1319,7 +1319,7 @@ public class KafkaAdminClient extends AdminClient { topicName + "' cannot be represented in a request.")); topicFutures.put(topicName, future); } else if (!topicFutures.containsKey(topicName)) { - topicFutures.put(topicName, new KafkaFutureImpl<Void>()); + topicFutures.put(topicName, new KafkaFutureImpl<>()); validTopicNames.add(topicName); } } @@ -1375,7 +1375,7 @@ public class KafkaAdminClient extends AdminClient { if (!validTopicNames.isEmpty()) { runnable.call(call, now); } - return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures)); + return new DeleteTopicsResult(new HashMap<>(topicFutures)); } @Override @@ -1417,12 +1417,12 @@ public class KafkaAdminClient extends AdminClient { final ArrayList<String> topicNamesList = new ArrayList<>(); for (String topicName : topicNames) { if (topicNameIsUnrepresentable(topicName)) { - KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<TopicDescription>(); + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); future.completeExceptionally(new InvalidTopicException("The given topic name '" + topicName + "' cannot be represented in a request.")); topicFutures.put(topicName, future); } else if (!topicFutures.containsKey(topicName)) { - topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>()); + topicFutures.put(topicName, new KafkaFutureImpl<>()); topicNamesList.add(topicName); } } @@ -1467,12 +1467,7 @@ public class KafkaAdminClient extends AdminClient { Arrays.asList(partitionInfo.inSyncReplicas())); partitions.add(topicPartitionInfo); } - Collections.sort(partitions, new Comparator<TopicPartitionInfo>() { - @Override - public int compare(TopicPartitionInfo tp1, TopicPartitionInfo tp2) { - return Integer.compare(tp1.partition(), tp2.partition()); - } - }); + partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions); future.complete(topicDescription); } @@ -1501,7 +1496,7 @@ public class KafkaAdminClient extends AdminClient { if (!topicNamesList.isEmpty()) { runnable.call(call, now); } - return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures)); + return new DescribeTopicsResult(new HashMap<>(topicFutures)); } @Override @@ -1517,7 +1512,7 @@ public class KafkaAdminClient extends AdminClient { AbstractRequest.Builder createRequest(int timeoutMs) { // Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it // simplifies communication with older brokers) - return new MetadataRequest.Builder(Collections.<String>emptyList(), true); + return new MetadataRequest.Builder(Collections.emptyList(), true); } @Override @@ -1627,7 +1622,7 @@ public class KafkaAdminClient extends AdminClient { completeAllExceptionally(futures.values(), throwable); } }, now); - return new CreateAclsResult(new HashMap<AclBinding, KafkaFuture<Void>>(futures)); + return new CreateAclsResult(new HashMap<>(futures)); } @Override @@ -1638,7 +1633,7 @@ public class KafkaAdminClient extends AdminClient { for (AclBindingFilter filter : filters) { if (futures.get(filter) == null) { filterList.add(filter); - futures.put(filter, new KafkaFutureImpl<FilterResults>()); + futures.put(filter, new KafkaFutureImpl<>()); } } runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.timeoutMs()), @@ -1679,7 +1674,7 @@ public class KafkaAdminClient extends AdminClient { completeAllExceptionally(futures.values(), throwable); } }, now); - return new DeleteAclsResult(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures)); + return new DeleteAclsResult(new HashMap<>(futures)); } @Override @@ -1899,7 +1894,7 @@ public class KafkaAdminClient extends AdminClient { final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new HashMap<>(replicaAssignment.size()); for (TopicPartitionReplica replica : replicaAssignment.keySet()) - futures.put(replica, new KafkaFutureImpl<Void>()); + futures.put(replica, new KafkaFutureImpl<>()); Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker = new HashMap<>(); for (Map.Entry<TopicPartitionReplica, String> entry: replicaAssignment.entrySet()) { @@ -1908,7 +1903,7 @@ public class KafkaAdminClient extends AdminClient { int brokerId = replica.brokerId(); TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition()); if (!replicaAssignmentByBroker.containsKey(brokerId)) - replicaAssignmentByBroker.put(brokerId, new HashMap<TopicPartition, String>()); + replicaAssignmentByBroker.put(brokerId, new HashMap<>()); replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir); } @@ -1950,7 +1945,7 @@ public class KafkaAdminClient extends AdminClient { }, now); } - return new AlterReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<Void>>(futures)); + return new AlterReplicaLogDirsResult(new HashMap<>(futures)); } @Override @@ -1958,7 +1953,7 @@ public class KafkaAdminClient extends AdminClient { final Map<Integer, KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>> futures = new HashMap<>(brokers.size()); for (Integer brokerId: brokers) { - futures.put(brokerId, new KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>()); + futures.put(brokerId, new KafkaFutureImpl<>()); } final long now = time.milliseconds(); @@ -1990,7 +1985,7 @@ public class KafkaAdminClient extends AdminClient { }, now); } - return new DescribeLogDirsResult(new HashMap<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>>(futures)); + return new DescribeLogDirsResult(new HashMap<>(futures)); } @Override @@ -1998,14 +1993,14 @@ public class KafkaAdminClient extends AdminClient { final Map<TopicPartitionReplica, KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> futures = new HashMap<>(replicas.size()); for (TopicPartitionReplica replica : replicas) { - futures.put(replica, new KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>()); + futures.put(replica, new KafkaFutureImpl<>()); } Map<Integer, Set<TopicPartition>> partitionsByBroker = new HashMap<>(); for (TopicPartitionReplica replica: replicas) { if (!partitionsByBroker.containsKey(replica.brokerId())) - partitionsByBroker.put(replica.brokerId(), new HashSet<TopicPartition>()); + partitionsByBroker.put(replica.brokerId(), new HashSet<>()); partitionsByBroker.get(replica.brokerId()).add(new TopicPartition(replica.topic(), replica.partition())); } @@ -2074,7 +2069,7 @@ public class KafkaAdminClient extends AdminClient { }, now); } - return new DescribeReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures)); + return new DescribeReplicaLogDirsResult(new HashMap<>(futures)); } @Override @@ -2082,7 +2077,7 @@ public class KafkaAdminClient extends AdminClient { final CreatePartitionsOptions options) { final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size()); for (String topic : newPartitions.keySet()) { - futures.put(topic, new KafkaFutureImpl<Void>()); + futures.put(topic, new KafkaFutureImpl<>()); } final Map<String, NewPartitions> requestMap = new HashMap<>(newPartitions); @@ -2121,7 +2116,7 @@ public class KafkaAdminClient extends AdminClient { completeAllExceptionally(futures.values(), throwable); } }, now); - return new CreatePartitionsResult(new HashMap<String, KafkaFuture<Void>>(futures)); + return new CreatePartitionsResult(new HashMap<>(futures)); } @Override @@ -2133,7 +2128,7 @@ public class KafkaAdminClient extends AdminClient { final Map<TopicPartition, KafkaFutureImpl<DeletedRecords>> futures = new HashMap<>(recordsToDelete.size()); for (TopicPartition topicPartition: recordsToDelete.keySet()) { - futures.put(topicPartition, new KafkaFutureImpl<DeletedRecords>()); + futures.put(topicPartition, new KafkaFutureImpl<>()); } // preparing topics list for asking metadata about them @@ -2180,7 +2175,7 @@ public class KafkaAdminClient extends AdminClient { Node node = cluster.leaderFor(entry.getKey()); if (node != null) { if (!leaders.containsKey(node)) - leaders.put(node, new HashMap<TopicPartition, Long>()); + leaders.put(node, new HashMap<>()); leaders.get(node).put(entry.getKey(), entry.getValue().beforeOffset()); } else { KafkaFutureImpl<DeletedRecords> future = futures.get(entry.getKey()); @@ -2231,7 +2226,7 @@ public class KafkaAdminClient extends AdminClient { } }, nowMetadata); - return new DeleteRecordsResult(new HashMap<TopicPartition, KafkaFuture<DeletedRecords>>(futures)); + return new DeleteRecordsResult(new HashMap<>(futures)); } @Override @@ -2373,7 +2368,7 @@ public class KafkaAdminClient extends AdminClient { groupId + "' cannot be represented in a request.")); futures.put(groupId, future); } else if (!futures.containsKey(groupId)) { - futures.put(groupId, new KafkaFutureImpl<ConsumerGroupDescription>()); + futures.put(groupId, new KafkaFutureImpl<>()); } } @@ -2469,7 +2464,7 @@ public class KafkaAdminClient extends AdminClient { }, startFindCoordinatorMs); } - return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures)); + return new DescribeConsumerGroupsResult(new HashMap<>(futures)); } private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) { @@ -2518,12 +2513,12 @@ public class KafkaAdminClient extends AdminClient { private synchronized void tryComplete() { if (remaining.isEmpty()) { - ArrayList<Object> results = new ArrayList<Object>(listings.values()); + ArrayList<Object> results = new ArrayList<>(listings.values()); results.addAll(errors); future.complete(results); } } - }; + } @Override public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { @@ -2687,7 +2682,7 @@ public class KafkaAdminClient extends AdminClient { groupId + "' cannot be represented in a request.")); futures.put(groupId, future); } else if (!futures.containsKey(groupId)) { - futures.put(groupId, new KafkaFutureImpl<Void>()); + futures.put(groupId, new KafkaFutureImpl<>()); } } @@ -2755,7 +2750,7 @@ public class KafkaAdminClient extends AdminClient { }, startFindCoordinatorMs); } - return new DeleteConsumerGroupsResult(new HashMap<String, KafkaFuture<Void>>(futures)); + return new DeleteConsumerGroupsResult(new HashMap<>(futures)); } @Override