This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a5ea6d1 MINOR: A few small cleanups in AdminClient from KAFKA-6299 (#4989) a5ea6d1 is described below commit a5ea6d10a8152df8f1dcb3d7c3fe3635325d82e4 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Thu May 10 17:31:12 2018 -0700 MINOR: A few small cleanups in AdminClient from KAFKA-6299 (#4989) Reviewers: Ismael Juma <ism...@juma.me.uk> --- .../kafka/clients/admin/KafkaAdminClient.java | 132 ++++++++------------- .../AdminMetadataManager.java | 2 +- 2 files changed, 53 insertions(+), 81 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 70e9fbd..c9e0e18 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -26,7 +26,7 @@ import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; -import org.apache.kafka.clients.admin.internal.AdminMetadataManager; +import org.apache.kafka.clients.admin.internals.AdminMetadataManager; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; @@ -738,6 +738,30 @@ public class KafkaAdminClient extends AdminClient { private final class AdminClientRunnable implements Runnable { /** + * Calls which have not yet been assigned to a node. + * Only accessed from this thread. + */ + private final ArrayList<Call> pendingCalls = new ArrayList<>(); + + /** + * Maps nodes to calls that we want to send. + * Only accessed from this thread. + */ + private final Map<Node, List<Call>> callsToSend = new HashMap<>(); + + /** + * Maps node ID strings to calls that have been sent. + * Only accessed from this thread. + */ + private final Map<String, List<Call>> callsInFlight = new HashMap<>(); + + /** + * Maps correlation IDs to calls that have been sent. + * Only accessed from this thread. + */ + private final Map<Integer, Call> correlationIdToCalls = new HashMap<>(); + + /** * Pending calls. Protected by the object monitor. * This will be null only if the thread has shut down. */ @@ -748,9 +772,8 @@ public class KafkaAdminClient extends AdminClient { * * @param processor The timeout processor. */ - private void timeoutPendingCalls(TimeoutProcessor processor, List<Call> pendingCalls) { - int numTimedOut = processor.handleTimeouts(pendingCalls, - "Timed out waiting for a node assignment."); + private void timeoutPendingCalls(TimeoutProcessor processor) { + int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed out waiting for a node assignment."); if (numTimedOut > 0) log.debug("Timed out {} pending calls.", numTimedOut); } @@ -759,9 +782,8 @@ public class KafkaAdminClient extends AdminClient { * Time out calls which have been assigned to nodes. * * @param processor The timeout processor. - * @param callsToSend A map of nodes to the calls they need to handle. */ - private int timeoutCallsToSend(TimeoutProcessor processor, Map<Node, List<Call>> callsToSend) { + private int timeoutCallsToSend(TimeoutProcessor processor) { int numTimedOut = 0; for (List<Call> callList : callsToSend.values()) { numTimedOut += processor.handleTimeouts(callList, @@ -778,7 +800,7 @@ public class KafkaAdminClient extends AdminClient { * This function holds the lock for the minimum amount of time, to avoid blocking * users of AdminClient who will also take the lock to add new calls. */ - private synchronized void drainNewCalls(ArrayList<Call> pendingCalls) { + private synchronized void drainNewCalls() { if (!newCalls.isEmpty()) { pendingCalls.addAll(newCalls); newCalls.clear(); @@ -790,11 +812,8 @@ public class KafkaAdminClient extends AdminClient { * * @param now The current time in milliseconds. * @param pendingIter An iterator yielding pending calls. - * @param callsToSend A map of nodes to the calls they need to handle. - * */ - private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter, - Map<Node, List<Call>> callsToSend) { + private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter) { while (pendingIter.hasNext()) { Call call = pendingIter.next(); Node node = null; @@ -821,14 +840,9 @@ public class KafkaAdminClient extends AdminClient { * Send the calls which are ready. * * @param now The current time in milliseconds. - * @param callsToSend The calls to send, by node. - * @param correlationIdToCalls A map of correlation IDs to calls. - * @param callsInFlight A map of nodes to the calls they have in flight. - * * @return The minimum timeout we need for poll(). */ - private long sendEligibleCalls(long now, Map<Node, List<Call>> callsToSend, - Map<Integer, Call> correlationIdToCalls, Map<String, List<Call>> callsInFlight) { + private long sendEligibleCalls(long now) { long pollTimeout = Long.MAX_VALUE; for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) { @@ -872,9 +886,8 @@ public class KafkaAdminClient extends AdminClient { * to time them out is to close the entire connection. * * @param processor The timeout processor. - * @param callsInFlight A map of nodes to the calls they have in flight. */ - private void timeoutCallsInFlight(TimeoutProcessor processor, Map<String, List<Call>> callsInFlight) { + private void timeoutCallsInFlight(TimeoutProcessor processor) { int numTimedOut = 0; for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) { List<Call> contexts = entry.getValue(); @@ -907,18 +920,12 @@ public class KafkaAdminClient extends AdminClient { * * @param now The current time in milliseconds. * @param responses The latest responses from KafkaClient. - * @param correlationIdToCall A map of correlation IDs to calls. - * @param callsInFlight A map of nodes to the calls they have in flight. - **/ - private void handleResponses(long now, - List<ClientResponse> responses, - Map<String, List<Call>> callsInFlight, - Map<Integer, Call> correlationIdToCall) { - + **/ + private void handleResponses(long now, List<ClientResponse> responses) { for (ClientResponse response : responses) { int correlationId = response.requestHeader().correlationId(); - Call call = correlationIdToCall.get(correlationId); + Call call = correlationIdToCalls.get(correlationId); if (call == null) { // If the server returns information about a correlation ID we didn't use yet, // an internal server error has occurred. Close the connection and log an error message. @@ -930,7 +937,7 @@ public class KafkaAdminClient extends AdminClient { } // Stop tracking this call. - correlationIdToCall.remove(correlationId); + correlationIdToCalls.remove(correlationId); List<Call> calls = callsInFlight.get(response.destination()); if ((calls == null) || (!calls.remove(call))) { log.error("Internal server error on {}: ignoring call {} in correlationIdToCall " + @@ -978,8 +985,7 @@ public class KafkaAdminClient extends AdminClient { /** * Return true if there are currently active external calls. */ - private boolean hasActiveExternalCalls(List<Call> pendingCalls, - Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) { + private boolean hasActiveExternalCalls() { if (hasActiveExternalCalls(pendingCalls)) { return true; } @@ -988,15 +994,11 @@ public class KafkaAdminClient extends AdminClient { return true; } } - if (hasActiveExternalCalls(correlationIdToCalls.values())) { - return true; - } - return false; + return hasActiveExternalCalls(correlationIdToCalls.values()); } - private boolean threadShouldExit(long now, long curHardShutdownTimeMs, List<Call> pendingCalls, - Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) { - if (!hasActiveExternalCalls(pendingCalls, callsToSend, correlationIdToCalls)) { + private boolean threadShouldExit(long now, long curHardShutdownTimeMs) { + if (!hasActiveExternalCalls()) { log.trace("All work has been completed, and the I/O thread is now exiting."); return true; } @@ -1010,48 +1012,22 @@ public class KafkaAdminClient extends AdminClient { @Override public void run() { - /** - * Calls which have not yet been assigned to a node. - * Only accessed from this thread. - */ - ArrayList<Call> pendingCalls = new ArrayList<>(); - - /** - * Maps nodes to calls that we want to send. - * Only accessed from this thread. - */ - Map<Node, List<Call>> callsToSend = new HashMap<>(); - - /** - * Maps node ID strings to calls that have been sent. - * Only accessed from this thread. - */ - Map<String, List<Call>> callsInFlight = new HashMap<>(); - - /** - * Maps correlation IDs to calls that have been sent. - * Only accessed from this thread. - */ - Map<Integer, Call> correlationIdToCalls = new HashMap<>(); - long now = time.milliseconds(); log.trace("Thread starting"); while (true) { // Copy newCalls into pendingCalls. - drainNewCalls(pendingCalls); + drainNewCalls(); // Check if the AdminClient thread should shut down. long curHardShutdownTimeMs = hardShutdownTimeMs.get(); - if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && - threadShouldExit(now, curHardShutdownTimeMs, pendingCalls, - callsToSend, correlationIdToCalls)) + if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs)) break; // Handle timeouts. TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now); - timeoutPendingCalls(timeoutProcessor, pendingCalls); - timeoutCallsToSend(timeoutProcessor, callsToSend); - timeoutCallsInFlight(timeoutProcessor, callsInFlight); + timeoutPendingCalls(timeoutProcessor); + timeoutCallsToSend(timeoutProcessor); + timeoutCallsInFlight(timeoutProcessor); long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs()); if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) { @@ -1059,7 +1035,7 @@ public class KafkaAdminClient extends AdminClient { } // Choose nodes for our pending calls. - chooseNodesForPendingCalls(now, pendingCalls.iterator(), callsToSend); + chooseNodesForPendingCalls(now, pendingCalls.iterator()); long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now); if (metadataFetchDelayMs == 0) { metadataManager.transitionToUpdatePending(now); @@ -1067,11 +1043,9 @@ public class KafkaAdminClient extends AdminClient { // Create a new metadata fetch call and add it to the end of pendingCalls. // Assign a node for just the new call (we handled the other pending nodes above). pendingCalls.add(metadataCall); - chooseNodesForPendingCalls(now, pendingCalls.listIterator(pendingCalls.size() - 1), - callsToSend); + chooseNodesForPendingCalls(now, pendingCalls.listIterator(pendingCalls.size() - 1)); } - pollTimeout = Math.min(pollTimeout, - sendEligibleCalls(now, callsToSend, correlationIdToCalls, callsInFlight)); + pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now)); if (metadataFetchDelayMs > 0) { pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs); } @@ -1083,18 +1057,16 @@ public class KafkaAdminClient extends AdminClient { // Update the current time and handle the latest responses. now = time.milliseconds(); - handleResponses(now, responses, callsInFlight, correlationIdToCalls); + handleResponses(now, responses); } int numTimedOut = 0; TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE); synchronized (this) { - numTimedOut += timeoutProcessor.handleTimeouts(newCalls, - "The AdminClient thread has exited."); + numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The AdminClient thread has exited."); newCalls = null; } - numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, - "The AdminClient thread has exited."); - numTimedOut += timeoutCallsToSend(timeoutProcessor, callsToSend); + numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited."); + numTimedOut += timeoutCallsToSend(timeoutProcessor); numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(), "The AdminClient thread has exited."); if (numTimedOut > 0) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java similarity index 99% rename from clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java rename to clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java index 63e7fc8..3806560 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.clients.admin.internal; +package org.apache.kafka.clients.admin.internals; import org.apache.kafka.clients.MetadataUpdater; import org.apache.kafka.common.Cluster; -- To stop receiving notification emails like this one, please contact j...@apache.org.