[ 
https://issues.apache.org/jira/browse/KAFKA-6299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16469537#comment-16469537
 ] 

ASF GitHub Bot commented on KAFKA-6299:
---------------------------------------

hachikuji closed pull request #4295: KAFKA-6299. Fix AdminClient error handling 
when metadata changes
URL: https://github.com/apache/kafka/pull/4295
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java 
b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 126728342d4..09ed995d14c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -29,7 +29,7 @@
  * <p>
  * This class is not thread-safe!
  */
-interface MetadataUpdater {
+public interface MetadataUpdater {
 
     /**
      * Gets the current cluster info without blocking.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d8c0bad8066..70e9fbd6fbb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -22,11 +22,11 @@
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.internal.AdminMetadataManager;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -44,7 +44,6 @@
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidRequestException;
@@ -186,9 +185,9 @@
     private final Time time;
 
     /**
-     * The cluster metadata used by the KafkaClient.
+     * The cluster metadata manager used by the KafkaClient.
      */
-    private final Metadata metadata;
+    private final AdminMetadataManager metadataManager;
 
     /**
      * The metrics for this KafkaAdminClient.
@@ -327,8 +326,9 @@ static KafkaAdminClient createInternal(AdminClientConfig 
config, TimeoutProcesso
         try {
             // Since we only request node information, it's safe to pass true 
for allowAutoTopicCreation (and it
             // simplifies communication with older brokers)
-            Metadata metadata = new 
Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                    config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), 
true);
+            AdminMetadataManager metadataManager = new 
AdminMetadataManager(logContext, time,
+                config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
             List<MetricsReporter> reporters = 
config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class);
             Map<String, String> metricTags = 
Collections.singletonMap("client-id", clientId);
@@ -344,7 +344,7 @@ static KafkaAdminClient createInternal(AdminClientConfig 
config, TimeoutProcesso
                     metrics, time, metricGrpPrefix, channelBuilder, 
logContext);
             networkClient = new NetworkClient(
                 selector,
-                metadata,
+                metadataManager.updater(),
                 clientId,
                 1,
                 config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
@@ -356,7 +356,7 @@ static KafkaAdminClient createInternal(AdminClientConfig 
config, TimeoutProcesso
                 true,
                 apiVersions,
                 logContext);
-            return new KafkaAdminClient(config, clientId, time, metadata, 
metrics, networkClient,
+            return new KafkaAdminClient(config, clientId, time, 
metadataManager, metrics, networkClient,
                 timeoutProcessorFactory, logContext);
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
@@ -367,35 +367,39 @@ static KafkaAdminClient createInternal(AdminClientConfig 
config, TimeoutProcesso
         }
     }
 
-    static KafkaAdminClient createInternal(AdminClientConfig config, 
KafkaClient client, Metadata metadata, Time time) {
+    static KafkaAdminClient createInternal(AdminClientConfig config, 
KafkaClient client, Time time) {
         Metrics metrics = null;
         String clientId = generateClientId(config);
 
         try {
             metrics = new Metrics(new MetricConfig(), new 
LinkedList<MetricsReporter>(), time);
-            return new KafkaAdminClient(config, clientId, time, metadata, 
metrics, client, null,
-                    createLogContext(clientId));
+            LogContext logContext = createLogContext(clientId);
+            AdminMetadataManager metadataManager = new 
AdminMetadataManager(logContext, time,
+                config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+            return new KafkaAdminClient(config, clientId, time, 
metadataManager, metrics,
+                client, null, logContext);
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
             throw new KafkaException("Failed create new KafkaAdminClient", 
exc);
         }
     }
 
-    private static LogContext createLogContext(String clientId) {
+    static LogContext createLogContext(String clientId) {
         return new LogContext("[AdminClient clientId=" + clientId + "] ");
     }
 
-    private KafkaAdminClient(AdminClientConfig config, String clientId, Time 
time, Metadata metadata,
-                     Metrics metrics, KafkaClient client, 
TimeoutProcessorFactory timeoutProcessorFactory,
-                     LogContext logContext) {
+    private KafkaAdminClient(AdminClientConfig config, String clientId, Time 
time,
+                AdminMetadataManager metadataManager, Metrics metrics, 
KafkaClient client,
+                TimeoutProcessorFactory timeoutProcessorFactory, LogContext 
logContext) {
         this.defaultTimeoutMs = 
config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.clientId = clientId;
         this.log = logContext.logger(KafkaAdminClient.class);
         this.time = time;
-        this.metadata = metadata;
+        this.metadataManager = metadataManager;
         List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(
             config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
-        this.metadata.update(Cluster.bootstrap(addresses), 
Collections.<String>emptySet(), time.milliseconds());
+        metadataManager.update(Cluster.bootstrap(addresses), 
time.milliseconds(), null);
         this.metrics = metrics;
         this.client = client;
         this.runnable = new AdminClientRunnable();
@@ -462,6 +466,13 @@ public void close(long duration, TimeUnit unit) {
         Node provide();
     }
 
+    private class MetadataUpdateNodeIdProvider implements NodeProvider {
+        @Override
+        public Node provide() {
+            return client.leastLoadedNode(time.milliseconds());
+        }
+    }
+
     private class ConstantNodeIdProvider implements NodeProvider {
         private final int nodeId;
 
@@ -471,7 +482,16 @@ public void close(long duration, TimeUnit unit) {
 
         @Override
         public Node provide() {
-            return metadata.fetch().nodeById(nodeId);
+            if (metadataManager.isReady() &&
+                    (metadataManager.nodeById(nodeId) != null)) {
+                return metadataManager.nodeById(nodeId);
+            }
+            // If we can't find the node with the given constant ID, we 
schedule a
+            // metadata update and hope it appears.  This behavior is useful 
for avoiding
+            // flaky behavior in tests when the cluster is starting up and not 
all nodes
+            // have appeared.
+            metadataManager.requestUpdate();
+            return null;
         }
     }
 
@@ -481,7 +501,12 @@ public Node provide() {
     private class ControllerNodeProvider implements NodeProvider {
         @Override
         public Node provide() {
-            return metadata.fetch().controller();
+            if (metadataManager.isReady() &&
+                    (metadataManager.controller() != null)) {
+                return metadataManager.controller();
+            }
+            metadataManager.requestUpdate();
+            return null;
         }
     }
 
@@ -491,23 +516,40 @@ public Node provide() {
     private class LeastLoadedNodeProvider implements NodeProvider {
         @Override
         public Node provide() {
-            return client.leastLoadedNode(time.milliseconds());
+            if (metadataManager.isReady()) {
+                // This may return null if all nodes are busy.
+                // In that case, we will postpone node assignment.
+                return client.leastLoadedNode(time.milliseconds());
+            }
+            metadataManager.requestUpdate();
+            return null;
         }
     }
 
     abstract class Call {
+        private final boolean internal;
         private final String callName;
         private final long deadlineMs;
         private final NodeProvider nodeProvider;
         private int tries = 0;
         private boolean aborted = false;
+        private Node curNode = null;
 
-        Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
+        Call(boolean internal, String callName, long deadlineMs, NodeProvider 
nodeProvider) {
+            this.internal = internal;
             this.callName = callName;
             this.deadlineMs = deadlineMs;
             this.nodeProvider = nodeProvider;
         }
 
+        Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
+            this(false, callName, deadlineMs, nodeProvider);
+        }
+
+        protected Node curNode() {
+            return curNode;
+        }
+
         /**
          * Handle a failure.
          *
@@ -615,6 +657,10 @@ boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception)
         public String toString() {
             return "Call(callName=" + callName + ", deadlineMs=" + deadlineMs 
+ ")";
         }
+
+        public boolean isInternal() {
+            return internal;
+        }
     }
 
     static class TimeoutProcessorFactory {
@@ -698,43 +744,15 @@ int nextTimeoutMs() {
         private List<Call> newCalls = new LinkedList<>();
 
         /**
-         * Check if the AdminClient metadata is ready.
-         * We need to know who the controller is, and have a non-empty view of 
the cluster.
-         *
-         * @param prevMetadataVersion       The previous metadata version 
which wasn't usable.
-         * @return                          null if the metadata is usable; 
the current metadata
-         *                                  version otherwise
-         */
-        private Integer checkMetadataReady(Integer prevMetadataVersion) {
-            if (prevMetadataVersion != null) {
-                if (prevMetadataVersion == metadata.version())
-                    return prevMetadataVersion;
-            }
-            Cluster cluster = metadata.fetch();
-            if (cluster.nodes().isEmpty()) {
-                log.trace("Metadata is not ready yet. No cluster nodes 
found.");
-                return metadata.requestUpdate();
-            }
-            if (cluster.controller() == null) {
-                log.trace("Metadata is not ready yet. No controller found.");
-                return metadata.requestUpdate();
-            }
-            if (prevMetadataVersion != null) {
-                log.trace("Metadata is now ready.");
-            }
-            return null;
-        }
-
-        /**
-         * Time out the elements in the newCalls list which are expired.
+         * Time out the elements in the pendingCalls list which are expired.
          *
          * @param processor     The timeout processor.
          */
-        private synchronized void timeoutNewCalls(TimeoutProcessor processor) {
-            int numTimedOut = processor.handleTimeouts(newCalls,
+        private void timeoutPendingCalls(TimeoutProcessor processor, 
List<Call> pendingCalls) {
+            int numTimedOut = processor.handleTimeouts(pendingCalls,
                     "Timed out waiting for a node assignment.");
             if (numTimedOut > 0)
-                log.debug("Timed out {} new calls.", numTimedOut);
+                log.debug("Timed out {} pending calls.", numTimedOut);
         }
 
         /**
@@ -743,7 +761,7 @@ private synchronized void timeoutNewCalls(TimeoutProcessor 
processor) {
          * @param processor     The timeout processor.
          * @param callsToSend   A map of nodes to the calls they need to 
handle.
          */
-        private void timeoutCallsToSend(TimeoutProcessor processor, Map<Node, 
List<Call>> callsToSend) {
+        private int timeoutCallsToSend(TimeoutProcessor processor, Map<Node, 
List<Call>> callsToSend) {
             int numTimedOut = 0;
             for (List<Call> callList : callsToSend.values()) {
                 numTimedOut += processor.handleTimeouts(callList,
@@ -751,48 +769,52 @@ private void timeoutCallsToSend(TimeoutProcessor 
processor, Map<Node, List<Call>
             }
             if (numTimedOut > 0)
                 log.debug("Timed out {} call(s) with assigned nodes.", 
numTimedOut);
+            return numTimedOut;
         }
 
         /**
-         * Choose nodes for the calls in the callsToSend list.
+         * Drain all the calls from newCalls into pendingCalls.
          *
          * This function holds the lock for the minimum amount of time, to 
avoid blocking
          * users of AdminClient who will also take the lock to add new calls.
-         *
-         * @param now           The current time in milliseconds.
-         * @param callsToSend   A map of nodes to the calls they need to 
handle.
-         *
          */
-        private void chooseNodesForNewCalls(long now, Map<Node, List<Call>> 
callsToSend) {
-            List<Call> newCallsToAdd = null;
-            synchronized (this) {
-                if (newCalls.isEmpty()) {
-                    return;
-                }
-                newCallsToAdd = newCalls;
-                newCalls = new LinkedList<>();
-            }
-            for (Call call : newCallsToAdd) {
-                chooseNodeForNewCall(now, callsToSend, call);
+        private synchronized void drainNewCalls(ArrayList<Call> pendingCalls) {
+            if (!newCalls.isEmpty()) {
+                pendingCalls.addAll(newCalls);
+                newCalls.clear();
             }
         }
 
         /**
-         * Choose a node for a new call.
+         * Choose nodes for the calls in the pendingCalls list.
          *
          * @param now           The current time in milliseconds.
+         * @param pendingIter   An iterator yielding pending calls.
          * @param callsToSend   A map of nodes to the calls they need to 
handle.
-         * @param call          The call.
+         *
          */
-        private void chooseNodeForNewCall(long now, Map<Node, List<Call>> 
callsToSend, Call call) {
-            Node node = call.nodeProvider.provide();
-            if (node == null) {
-                call.fail(now, new BrokerNotAvailableException(
-                    String.format("Error choosing node for %s: no node 
found.", call.callName)));
-                return;
+        private void chooseNodesForPendingCalls(long now, Iterator<Call> 
pendingIter,
+                Map<Node, List<Call>> callsToSend) {
+            while (pendingIter.hasNext()) {
+                Call call = pendingIter.next();
+                Node node = null;
+                try {
+                    node = call.nodeProvider.provide();
+                } catch (Throwable t) {
+                    // Handle authentication errors while choosing nodes.
+                    log.debug("Unable to choose node for {}", call, t);
+                    pendingIter.remove();
+                    call.fail(now, t);
+                }
+                if (node != null) {
+                    log.trace("Assigned {} to node {}", call, node);
+                    pendingIter.remove();
+                    call.curNode = node;
+                    getOrCreateListValue(callsToSend, node).add(call);
+                } else {
+                    log.trace("Unable to assign {} to a node.", call);
+                }
             }
-            log.trace("Assigned {} to {}", call, node);
-            getOrCreateListValue(callsToSend, node).add(call);
         }
 
         /**
@@ -880,37 +902,6 @@ private void timeoutCallsInFlight(TimeoutProcessor 
processor, Map<String, List<C
                 log.debug("Timed out {} call(s) in flight.", numTimedOut);
         }
 
-        /**
-         * If an authentication exception is encountered with connection to 
any broker,
-         * fail all pending requests.
-         */
-        private void handleAuthenticationException(long now, Map<Node, 
List<Call>> callsToSend) {
-            AuthenticationException authenticationException = 
metadata.getAndClearAuthenticationException();
-            if (authenticationException == null) {
-                for (Node node : callsToSend.keySet()) {
-                    authenticationException = 
client.authenticationException(node);
-                    if (authenticationException != null)
-                        break;
-                }
-            }
-            if (authenticationException != null) {
-                synchronized (this) {
-                    failCalls(now, newCalls, authenticationException);
-                }
-                for (List<Call> calls : callsToSend.values()) {
-                    failCalls(now, calls, authenticationException);
-                }
-                callsToSend.clear();
-            }
-        }
-
-        private void failCalls(long now, List<Call> calls, 
AuthenticationException authenticationException) {
-            for (Call call : calls) {
-                call.fail(now, authenticationException);
-            }
-            calls.clear();
-        }
-
         /**
          * Handle responses from the server.
          *
@@ -952,9 +943,14 @@ private void handleResponses(long now,
                 if (response.versionMismatch() != null) {
                     call.fail(now, response.versionMismatch());
                 } else if (response.wasDisconnected()) {
-                    call.fail(now, new DisconnectException(String.format(
-                        "Cancelled %s request with correlation id %s due to 
node %s being disconnected",
-                        call.callName, correlationId, 
response.destination())));
+                    AuthenticationException authException = 
client.authenticationException(call.curNode());
+                    if (authException != null) {
+                        call.fail(now, authException);
+                    } else {
+                        call.fail(now, new DisconnectException(String.format(
+                            "Cancelled %s request with correlation id %s due 
to node %s being disconnected",
+                            call.callName, correlationId, 
response.destination())));
+                    }
                 } else {
                     try {
                         call.handleResponse(response.responseBody());
@@ -970,13 +966,41 @@ private void handleResponses(long now,
             }
         }
 
-        private synchronized boolean threadShouldExit(long now, long 
curHardShutdownTimeMs,
+        private boolean hasActiveExternalCalls(Collection<Call> calls) {
+            for (Call call : calls) {
+                if (!call.isInternal()) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        /**
+         * Return true if there are currently active external calls.
+         */
+        private boolean hasActiveExternalCalls(List<Call> pendingCalls,
                 Map<Node, List<Call>> callsToSend, Map<Integer, Call> 
correlationIdToCalls) {
-            if (newCalls.isEmpty() && callsToSend.isEmpty() && 
correlationIdToCalls.isEmpty()) {
+            if (hasActiveExternalCalls(pendingCalls)) {
+                return true;
+            }
+            for (List<Call> callList : callsToSend.values()) {
+                if (hasActiveExternalCalls(callList)) {
+                    return true;
+                }
+            }
+            if (hasActiveExternalCalls(correlationIdToCalls.values())) {
+                return true;
+            }
+            return false;
+        }
+
+        private boolean threadShouldExit(long now, long curHardShutdownTimeMs, 
List<Call> pendingCalls,
+                Map<Node, List<Call>> callsToSend, Map<Integer, Call> 
correlationIdToCalls) {
+            if (!hasActiveExternalCalls(pendingCalls, callsToSend, 
correlationIdToCalls)) {
                 log.trace("All work has been completed, and the I/O thread is 
now exiting.");
                 return true;
             }
-            if (now > curHardShutdownTimeMs) {
+            if (now >= curHardShutdownTimeMs) {
                 log.info("Forcing a hard I/O thread shutdown. Requests in 
progress will be aborted.");
                 return true;
             }
@@ -986,38 +1010,46 @@ private synchronized boolean threadShouldExit(long now, 
long curHardShutdownTime
 
         @Override
         public void run() {
-            /*
+            /**
+             * Calls which have not yet been assigned to a node.
+             * Only accessed from this thread.
+             */
+            ArrayList<Call> pendingCalls = new ArrayList<>();
+
+            /**
              * Maps nodes to calls that we want to send.
+             * Only accessed from this thread.
              */
             Map<Node, List<Call>> callsToSend = new HashMap<>();
 
-            /*
+            /**
              * Maps node ID strings to calls that have been sent.
+             * Only accessed from this thread.
              */
             Map<String, List<Call>> callsInFlight = new HashMap<>();
 
-            /*
+            /**
              * Maps correlation IDs to calls that have been sent.
+             * Only accessed from this thread.
              */
             Map<Integer, Call> correlationIdToCalls = new HashMap<>();
 
-            /*
-             * The previous metadata version which wasn't usable, or null if 
there is none.
-             */
-            Integer prevMetadataVersion = null;
-
             long now = time.milliseconds();
             log.trace("Thread starting");
             while (true) {
+                // Copy newCalls into pendingCalls.
+                drainNewCalls(pendingCalls);
+
                 // Check if the AdminClient thread should shut down.
                 long curHardShutdownTimeMs = hardShutdownTimeMs.get();
                 if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) &&
-                        threadShouldExit(now, curHardShutdownTimeMs, 
callsToSend, correlationIdToCalls))
+                        threadShouldExit(now, curHardShutdownTimeMs, 
pendingCalls,
+                            callsToSend, correlationIdToCalls))
                     break;
 
                 // Handle timeouts.
                 TimeoutProcessor timeoutProcessor = 
timeoutProcessorFactory.create(now);
-                timeoutNewCalls(timeoutProcessor);
+                timeoutPendingCalls(timeoutProcessor, pendingCalls);
                 timeoutCallsToSend(timeoutProcessor, callsToSend);
                 timeoutCallsInFlight(timeoutProcessor, callsInFlight);
 
@@ -1026,12 +1058,22 @@ public void run() {
                     pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs 
- now);
                 }
 
-                // Handle new calls and metadata update requests.
-                prevMetadataVersion = checkMetadataReady(prevMetadataVersion);
-                if (prevMetadataVersion == null) {
-                    chooseNodesForNewCalls(now, callsToSend);
-                    pollTimeout = Math.min(pollTimeout,
-                        sendEligibleCalls(now, callsToSend, 
correlationIdToCalls, callsInFlight));
+                // Choose nodes for our pending calls.
+                chooseNodesForPendingCalls(now, pendingCalls.iterator(), 
callsToSend);
+                long metadataFetchDelayMs = 
metadataManager.metadataFetchDelayMs(now);
+                if (metadataFetchDelayMs == 0) {
+                    metadataManager.transitionToUpdatePending(now);
+                    Call metadataCall = makeMetadataCall(now);
+                    // Create a new metadata fetch call and add it to the end 
of pendingCalls.
+                    // Assign a node for just the new call (we handled the 
other pending nodes above).
+                    pendingCalls.add(metadataCall);
+                    chooseNodesForPendingCalls(now, 
pendingCalls.listIterator(pendingCalls.size() - 1),
+                        callsToSend);
+                }
+                pollTimeout = Math.min(pollTimeout,
+                    sendEligibleCalls(now, callsToSend, correlationIdToCalls, 
callsInFlight));
+                if (metadataFetchDelayMs > 0) {
+                    pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
                 }
 
                 // Wait for network responses.
@@ -1041,7 +1083,6 @@ public void run() {
 
                 // Update the current time and handle the latest responses.
                 now = time.milliseconds();
-                handleAuthenticationException(now, callsToSend);
                 handleResponses(now, responses, callsInFlight, 
correlationIdToCalls);
             }
             int numTimedOut = 0;
@@ -1051,10 +1092,13 @@ public void run() {
                         "The AdminClient thread has exited.");
                 newCalls = null;
             }
+            numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls,
+                "The AdminClient thread has exited.");
+            numTimedOut += timeoutCallsToSend(timeoutProcessor, callsToSend);
             numTimedOut += 
timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
                     "The AdminClient thread has exited.");
             if (numTimedOut > 0) {
-                log.debug("Timed out {} remaining operations.", numTimedOut);
+                log.debug("Timed out {} remaining operation(s).", numTimedOut);
             }
             closeQuietly(client, "KafkaClient");
             closeQuietly(metrics, "Metrics");
@@ -1106,6 +1150,40 @@ void call(Call call, long now) {
                 enqueue(call, now);
             }
         }
+
+        /**
+         * Create a new metadata call.
+         */
+        private Call makeMetadataCall(long now) {
+            return new Call(true, "fetchMetadata", calcDeadlineMs(now, 
defaultTimeoutMs),
+                    new MetadataUpdateNodeIdProvider()) {
+                @Override
+                public AbstractRequest.Builder createRequest(int timeoutMs) {
+                    // Since this only requests node information, it's safe to 
pass true
+                    // for allowAutoTopicCreation (and it simplifies 
communication with
+                    // older brokers)
+                    return new 
MetadataRequest.Builder(Collections.<String>emptyList(), true);
+                }
+
+                @Override
+                public void handleResponse(AbstractResponse abstractResponse) {
+                    MetadataResponse response = (MetadataResponse) 
abstractResponse;
+                    metadataManager.update(response.cluster(), 
time.milliseconds(), null);
+                }
+
+                @Override
+                public void handleFailure(Throwable e) {
+                    if (e instanceof AuthenticationException) {
+                        log.info("Unable to fetch cluster metadata from node 
{} because of " +
+                            "authentication error", curNode(), e);
+                        metadataManager.update(Cluster.empty(), 
time.milliseconds(), (AuthenticationException) e);
+                    } else {
+                        log.info("Unable to fetch cluster metadata from node 
{}",
+                            curNode(), e);
+                    }
+                }
+            };
+        }
     }
 
     /**
@@ -1149,6 +1227,14 @@ public CreateTopicsResult createTopics(final 
Collection<NewTopic> newTopics,
             @Override
             public void handleResponse(AbstractResponse abstractResponse) {
                 CreateTopicsResponse response = (CreateTopicsResponse) 
abstractResponse;
+                // Check for controller change
+                for (ApiError error : response.errors().values()) {
+                    if (error.error() == Errors.NOT_CONTROLLER) {
+                        metadataManager.clearController();
+                        metadataManager.requestUpdate();
+                        throw error.exception();
+                    }
+                }
                 // Handle server responses for particular topics.
                 for (Map.Entry<String, ApiError> entry : 
response.errors().entrySet()) {
                     KafkaFutureImpl<Void> future = 
topicFutures.get(entry.getKey());
@@ -1212,6 +1298,14 @@ public DeleteTopicsResult 
deleteTopics(Collection<String> topicNames,
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 DeleteTopicsResponse response = (DeleteTopicsResponse) 
abstractResponse;
+                // Check for controller change
+                for (Errors error : response.errors().values()) {
+                    if (error == Errors.NOT_CONTROLLER) {
+                        metadataManager.clearController();
+                        metadataManager.requestUpdate();
+                        throw error.exception();
+                    }
+                }
                 // Handle server responses for particular topics.
                 for (Map.Entry<String, Errors> entry : 
response.errors().entrySet()) {
                     KafkaFutureImpl<Void> future = 
topicFutures.get(entry.getKey());
@@ -1982,6 +2076,14 @@ public CreatePartitionsResult 
createPartitions(Map<String, NewPartitions> newPar
             @Override
             public void handleResponse(AbstractResponse abstractResponse) {
                 CreatePartitionsResponse response = (CreatePartitionsResponse) 
abstractResponse;
+                // Check for controller change
+                for (ApiError error : response.errors().values()) {
+                    if (error.error() == Errors.NOT_CONTROLLER) {
+                        metadataManager.clearController();
+                        metadataManager.requestUpdate();
+                        throw error.exception();
+                    }
+                }
                 for (Map.Entry<String, ApiError> result : 
response.errors().entrySet()) {
                     KafkaFutureImpl<Void> future = 
futures.get(result.getKey());
                     if (result.getValue().isSuccess()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
new file mode 100644
index 00000000000..63e7fc8fa0a
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internal;
+
+import org.apache.kafka.clients.MetadataUpdater;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Manages the metadata for KafkaAdminClient.
+ *
+ * This class is not thread-safe.  It is only accessed from the AdminClient
+ * service thread (which also uses the NetworkClient).
+ */
+public class AdminMetadataManager {
+    private Logger log;
+
+    /**
+     * The timer.
+     */
+    private final Time time;
+
+    /**
+     * The minimum amount of time that we should wait between subsequent
+     * retries, when fetching metadata.
+     */
+    private final long refreshBackoffMs;
+
+    /**
+     * The minimum amount of time that we should wait before triggering an
+     * automatic metadata refresh.
+     */
+    private final long metadataExpireMs;
+
+    /**
+     * Used to update the NetworkClient metadata.
+     */
+    private final AdminMetadataUpdater updater;
+
+    /**
+     * The current metadata state.
+     */
+    private State state = State.QUIESCENT;
+
+    /**
+     * The time in wall-clock milliseconds when we last updated the metadata.
+     */
+    private long lastMetadataUpdateMs = 0;
+
+    /**
+     * The time in wall-clock milliseconds when we last attempted to fetch new
+     * metadata.
+     */
+    private long lastMetadataFetchAttemptMs = 0;
+
+    /**
+     * The current cluster information.
+     */
+    private Cluster cluster = Cluster.empty();
+
+    /**
+     * If we got an authorization exception when we last attempted to fetch
+     * metadata, this is it; null, otherwise.
+     */
+    private AuthenticationException authException = null;
+
+    public class AdminMetadataUpdater implements MetadataUpdater {
+        @Override
+        public List<Node> fetchNodes() {
+            return cluster.nodes();
+        }
+
+        @Override
+        public boolean isUpdateDue(long now) {
+            return false;
+        }
+
+        @Override
+        public long maybeUpdate(long now) {
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public void handleDisconnection(String destination) {
+            // Do nothing
+        }
+
+        @Override
+        public void handleAuthenticationFailure(AuthenticationException e) {
+            log.info("AdminMetadataManager got AuthenticationException", e);
+            update(Cluster.empty(), time.milliseconds(), e);
+        }
+
+        @Override
+        public void handleCompletedMetadataResponse(RequestHeader 
requestHeader, long now, MetadataResponse metadataResponse) {
+            // Do nothing
+        }
+
+        @Override
+        public void requestUpdate() {
+            // Do nothing
+        }
+    }
+
+    /**
+     * The current AdminMetadataManager state.
+     */
+    enum State {
+        QUIESCENT,
+        UPDATE_REQUESTED,
+        UPDATE_PENDING;
+    }
+
+    public AdminMetadataManager(LogContext logContext, Time time, long 
refreshBackoffMs,
+                                long metadataExpireMs) {
+        this.log = logContext.logger(AdminMetadataManager.class);
+        this.time = time;
+        this.refreshBackoffMs = refreshBackoffMs;
+        this.metadataExpireMs = metadataExpireMs;
+        this.updater = new AdminMetadataUpdater();
+    }
+
+    public AdminMetadataUpdater updater() {
+        return updater;
+    }
+
+    public boolean isReady() {
+        if (authException != null) {
+            log.debug("Metadata is not usable: failed to get metadata.", 
authException);
+            throw authException;
+        }
+        if (cluster.nodes().isEmpty()) {
+            log.trace("Metadata is not ready: bootstrap nodes have not been " +
+                "initialized yet.");
+            return false;
+        }
+        if (cluster.isBootstrapConfigured()) {
+            log.trace("Metadata is not ready: we have not fetched metadata 
from " +
+                "the bootstrap nodes yet.");
+            return false;
+        }
+        log.trace("Metadata is ready to use.");
+        return true;
+    }
+
+    public Node controller() {
+        return cluster.controller();
+    }
+
+    public Node nodeById(int nodeId) {
+        return cluster.nodeById(nodeId);
+    }
+
+    public void requestUpdate() {
+        if (state == State.QUIESCENT) {
+            state = State.UPDATE_REQUESTED;
+            log.debug("Requesting metadata update.");
+        }
+    }
+
+    public void clearController() {
+        if (cluster.controller() != null) {
+            log.trace("Clearing cached controller node {}.", 
cluster.controller());
+            this.cluster = new Cluster(cluster.clusterResource().clusterId(),
+                cluster.nodes(),
+                Collections.<PartitionInfo>emptySet(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(),
+                null);
+        }
+    }
+
+    /**
+     * Determine if the AdminClient should fetch new metadata.
+     */
+    public long metadataFetchDelayMs(long now) {
+        switch (state) {
+            case QUIESCENT:
+                // Calculate the time remaining until the next periodic update.
+                // We want to avoid making many metadata requests in a short 
amount of time,
+                // so there is a metadata refresh backoff period.
+                long timeSinceUpdate = now - lastMetadataUpdateMs;
+                long timeRemainingUntilUpdate = metadataExpireMs - 
timeSinceUpdate;
+                long timeSinceAttempt = now - lastMetadataFetchAttemptMs;
+                long timeRemainingUntilAttempt = refreshBackoffMs - 
timeSinceAttempt;
+                return Math.max(Math.max(0L, timeRemainingUntilUpdate), 
timeRemainingUntilAttempt);
+            case UPDATE_REQUESTED:
+                // An update has been explicitly requested.  Do it as soon as 
possible.
+                return 0;
+            default:
+                // An update is already pending, so we don't need to initiate 
another one.
+                return Long.MAX_VALUE;
+        }
+    }
+
+    /**
+     * Transition into the UPDATE_PENDING state.  Updates 
lastMetadataFetchAttemptMs.
+     */
+    public void transitionToUpdatePending(long now) {
+        this.state = State.UPDATE_PENDING;
+        this.lastMetadataFetchAttemptMs = now;
+    }
+
+    /**
+     * Receive new metadata, and transition into the QUIESCENT state.
+     * Updates lastMetadataUpdateMs, cluster, and authException.
+     */
+    public void update(Cluster cluster, long now, AuthenticationException 
authException) {
+        if (cluster.isBootstrapConfigured()) {
+            log.debug("Setting bootstrap cluster metadata {}.", cluster);
+        } else {
+            log.debug("Received cluster metadata {}{}.",
+                cluster, authException == null ? "" : " with authentication 
exception.");
+        }
+        this.state = State.QUIESCENT;
+        this.lastMetadataUpdateMs = now;
+        this.authException = authException;
+        if (!cluster.nodes().isEmpty()) {
+            this.cluster = cluster;
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java 
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 0c59f33824d..ccbaa306d48 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -274,7 +274,8 @@ public Node controller() {
 
     @Override
     public String toString() {
-        return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + 
this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
+        return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + 
this.nodes +
+            ", partitions = " + this.partitionsByTopicPartition.values() + ", 
controller = " + controller + ")";
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 37b43e5e36e..7a8ba1c6b29 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -81,6 +81,7 @@ public FutureResponse(Node node,
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
     private final Map<Node, Long> blackedOut = new HashMap<>();
+    private final Map<Node, Long> pendingAuthenticationErrors = new 
HashMap<>();
     private final Map<Node, AuthenticationException> authenticationErrors = 
new HashMap<>();
     // Use concurrent queue for requests so that requests may be queried from 
a different thread
     private final Queue<ClientRequest> requests = new 
ConcurrentLinkedDeque<>();
@@ -128,11 +129,16 @@ public void blackout(Node node, long duration) {
     }
 
     public void authenticationFailed(Node node, long duration) {
+        pendingAuthenticationErrors.remove(node);
         authenticationErrors.put(node, (AuthenticationException) 
Errors.SASL_AUTHENTICATION_FAILED.exception());
         disconnect(node.idString());
         blackout(node, duration);
     }
 
+    public void createPendingAuthenticationError(Node node, long blackoutMs) {
+        pendingAuthenticationErrors.put(node, blackoutMs);
+    }
+
     private boolean isBlackedOut(Node node) {
         if (blackedOut.containsKey(node)) {
             long expiration = blackedOut.get(node);
@@ -174,6 +180,26 @@ public void disconnect(String node) {
 
     @Override
     public void send(ClientRequest request, long now) {
+        // Check if the request is directed to a node with a pending 
authentication error.
+        for (Iterator<Map.Entry<Node, Long>> authErrorIter =
+             pendingAuthenticationErrors.entrySet().iterator(); 
authErrorIter.hasNext(); ) {
+            Map.Entry<Node, Long> entry = authErrorIter.next();
+            Node node = entry.getKey();
+            long blackoutMs = entry.getValue();
+            if (node.idString().equals(request.destination())) {
+                authErrorIter.remove();
+                // Set up a disconnected ClientResponse and create an 
authentication error
+                // for the affected node.
+                authenticationFailed(node, blackoutMs);
+                AbstractRequest.Builder<?> builder = request.requestBuilder();
+                short version = 
nodeApiVersions.latestUsableVersion(request.apiKey(), 
builder.oldestAllowedVersion(),
+                    builder.latestAllowedVersion());
+                ClientResponse resp = new 
ClientResponse(request.makeHeader(version), request.callback(), 
request.destination(),
+                    request.createdTimeMs(), time.milliseconds(), true, null, 
null);
+                responses.add(resp);
+                return;
+            }
+        }
         Iterator<FutureResponse> iterator = futureResponses.iterator();
         while (iterator.hasNext()) {
             FutureResponse futureResp = iterator.next();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 10281fb6ffa..f862c140c0d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -16,11 +16,12 @@
  */
 package org.apache.kafka.clients.admin;
 
-import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.Time;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -50,13 +51,25 @@ public AdminClientUnitTestEnv(Time time, Cluster cluster, 
String...vals) {
     }
 
     public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, 
Object> config) {
+        this(newMockClient(time, cluster), time, cluster, config);
+    }
+
+    private static MockClient newMockClient(Time time, Cluster cluster) {
+        MockClient mockClient = new MockClient(time);
+        mockClient.prepareResponse(new MetadataResponse(cluster.nodes(),
+            cluster.clusterResource().clusterId(),
+            cluster.controller().id(),
+            Collections.<MetadataResponse.TopicMetadata>emptyList()));
+        return mockClient;
+    }
+
+    public AdminClientUnitTestEnv(MockClient mockClient, Time time, Cluster 
cluster,
+                                  Map<String, Object> config) {
         this.time = time;
         this.cluster = cluster;
         AdminClientConfig adminClientConfig = new AdminClientConfig(config);
-        Metadata metadata = new 
Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
-        this.mockClient = new MockClient(time, metadata);
-        this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, 
mockClient, metadata, time);
+        this.mockClient = mockClient;
+        this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, 
mockClient, time);
     }
 
     public Time time() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0debed3dead..cdd9a282a3d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -39,6 +40,7 @@
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -65,6 +67,8 @@
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Ignore;
@@ -86,6 +90,7 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.requests.ResourceType.BROKER;
@@ -169,15 +174,18 @@ public void testGenerateClientId() {
                 
KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG,
 "myCustomId")));
     }
 
-    private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
+    private static Cluster mockCluster(int controllerIndex) {
         HashMap<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));
         nodes.put(1, new Node(1, "localhost", 8122));
         nodes.put(2, new Node(2, "localhost", 8123));
-        Cluster cluster = new Cluster("mockClusterId", nodes.values(),
+        return new Cluster("mockClusterId", nodes.values(),
                 Collections.<PartitionInfo>emptySet(), 
Collections.<String>emptySet(),
-                Collections.<String>emptySet(), nodes.get(0));
-        return new AdminClientUnitTestEnv(cluster, configVals);
+                Collections.<String>emptySet(), nodes.get(controllerIndex));
+    }
+
+    private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
+        return new AdminClientUnitTestEnv(mockCluster(0), configVals);
     }
 
     @Test
@@ -204,7 +212,13 @@ private static void assertFutureError(Future<?> future, 
Class<? extends Throwabl
      */
     @Test
     public void testTimeoutWithoutMetadata() throws Exception {
-        try (AdminClientUnitTestEnv env = 
mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) {
+        Cluster cluster = mockCluster(0);
+        MockClient mockClient = new MockClient(Time.SYSTEM);
+        try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockClient,
+                Time.SYSTEM,
+                cluster,
+                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:8121",
+                    AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().setNode(new Node(0, "localhost", 8121));
             env.kafkaClient().prepareResponse(new 
CreateTopicsResponse(Collections.singletonMap("myTopic", new 
ApiError(Errors.NONE, ""))));
@@ -215,6 +229,30 @@ public void testTimeoutWithoutMetadata() throws Exception {
         }
     }
 
+    /**
+     * Test that we propagate exceptions encountered when fetching metadata.
+     */
+    @Test
+    public void testPropagatedMetadataFetchException() throws Exception {
+        Cluster cluster = mockCluster(0);
+        MockClient mockClient = new MockClient(Time.SYSTEM);
+        mockClient.createPendingAuthenticationError(cluster.nodeById(0),
+            TimeUnit.DAYS.toMillis(1));
+        try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockClient,
+                Time.SYSTEM,
+                cluster,
+                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:8121",
+                    AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().setNode(env.cluster().nodeById(0));
+            env.kafkaClient().prepareResponse(new 
CreateTopicsResponse(Collections.singletonMap("myTopic", new 
ApiError(Errors.NONE, ""))));
+            KafkaFuture<Void> future = env.adminClient().createTopics(
+                Collections.singleton(new NewTopic("myTopic", 
Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+                new CreateTopicsOptions().timeoutMs(1000)).all();
+            assertFutureError(future, SaslAuthenticationException.class);
+        }
+    }
+
     @Test
     public void testCreateTopics() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -229,6 +267,30 @@ public void testCreateTopics() throws Exception {
         }
     }
 
+    @Test
+    public void testCreateTopicsHandleNotControllerException() throws 
Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(mockCluster(0), 
Collections.<String>emptySet());
+            env.kafkaClient().prepareMetadataUpdate(mockCluster(1), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().nodeById(0));
+            env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
+                Collections.singletonMap("myTopic", new 
ApiError(Errors.NOT_CONTROLLER, ""))),
+                env.cluster().nodeById(0));
+            env.kafkaClient().prepareResponse(new 
MetadataResponse(env.cluster().nodes(),
+                env.cluster().clusterResource().clusterId(),
+                1,
+                Collections.<MetadataResponse.TopicMetadata>emptyList()));
+            env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
+                    Collections.singletonMap("myTopic", new 
ApiError(Errors.NONE, ""))),
+                env.cluster().nodeById(1));
+            KafkaFuture<Void> future = env.adminClient().createTopics(
+                Collections.singleton(new NewTopic("myTopic", 
Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+                new CreateTopicsOptions().timeoutMs(10000)).all();
+            future.get();
+        }
+    }
+
     @Test
     public void testInvalidTopicNames() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -266,33 +328,31 @@ public void testInvalidTopicNames() throws Exception {
     }
 
     @Test
-    public void 
testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws 
Exception {
-        AdminClientUnitTestEnv env = mockClientEnv();
-        Node node = env.cluster().controller();
-        env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-        env.kafkaClient().setNode(node);
-        env.kafkaClient().authenticationFailed(node, 300);
-
-        callAdminClientApisAndExpectAnAuthenticationError(env);
-
-        // wait less than the blackout period, the connection should fail and 
the authentication error should remain
-        env.time().sleep(30);
-        assertTrue(env.kafkaClient().connectionFailed(node));
-        callAdminClientApisAndExpectAnAuthenticationError(env);
-
-        env.close();
+    public void testAdminClientApisAuthenticationFailure() throws Exception {
+        Cluster cluster = mockCluster(0);
+        MockClient mockClient = new MockClient(Time.SYSTEM);
+        mockClient.createPendingAuthenticationError(cluster.nodeById(0),
+            TimeUnit.DAYS.toMillis(1));
+        try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockClient,
+                Time.SYSTEM,
+                cluster,
+                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:8121",
+                     AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().setNode(env.cluster().controller());
+            callAdminClientApisAndExpectAnAuthenticationError(env);
+        }
     }
 
     private void 
callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) 
throws InterruptedException {
-        env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-
         try {
             env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", 
Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
                     new CreateTopicsOptions().timeoutMs(10000)).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + 
Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
 
         try {
@@ -302,35 +362,40 @@ private void 
callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTe
             env.adminClient().createPartitions(counts).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + 
Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
 
         try {
             env.adminClient().createAcls(asList(ACL1, ACL2)).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + 
Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
 
         try {
             env.adminClient().describeAcls(FILTER1).values().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + 
Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
 
         try {
             env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + 
Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
 
         try {
             env.adminClient().describeConfigs(Collections.singleton(new 
ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get();
             fail("Expected an authentication error.");
         } catch (ExecutionException e) {
-            assertTrue("Expected only an authentication error.", e.getCause() 
instanceof AuthenticationException);
+            assertTrue("Expected an authentication error, but got " + 
Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
         }
     }
 
@@ -918,7 +983,7 @@ public FailureInjectingTimeoutProcessor(long now) {
             }
 
             boolean callHasExpired(KafkaAdminClient.Call call) {
-                if (shouldInjectFailure()) {
+                if ((!call.isInternal()) && shouldInjectFailure()) {
                     log.debug("Injecting timeout for {}.", call);
                     return true;
                 } else {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index cda68795689..5b1e1553559 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -52,7 +52,6 @@ public void returnNullWithApiVersionMismatch() {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
             env.kafkaClient().setNode(cluster.controller());
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             
env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
             boolean created = admin.createTopic(newTopic);
@@ -65,7 +64,7 @@ public void returnNullWithClusterAuthorizationFailure() {
         final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(cluster.nodes().iterator().next());
             
env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
             boolean created = admin.createTopic(newTopic);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix AdminClient error handling when metadata changes
> ----------------------------------------------------
>
>                 Key: KAFKA-6299
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6299
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Colin P. McCabe
>            Assignee: Colin P. McCabe
>            Priority: Major
>             Fix For: 2.0.0
>
>
> * AdminClient should only call Metadata#requestUpdate when needed.
> * AdminClient should retry requests for which the controller has changed.
> * Fix an issue where AdminClient requests might not get a security exception, 
> even when a metadata fetch fails with an authorization exception. 
> * Fix a possible issue where AdminClient might leak a socket after the 
> timeout expires on a hard close, if a very narrow race condition is hit



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to