Repository: kafka Updated Branches: refs/heads/trunk c62cff355 -> 991195416
KAFKA-2042; Update topic list of the metadata regardless of cluster information; reviewed by Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99119541 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99119541 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99119541 Branch: refs/heads/trunk Commit: 991195416e0c179d2d2a79891d0214244c287618 Parents: c62cff3 Author: Jiangjie Qin <[email protected]> Authored: Tue Mar 24 15:48:46 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Mar 24 15:48:46 2015 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/kafka/clients/Metadata.java | 9 +++++++++ .../org/apache/kafka/clients/producer/KafkaProducer.java | 9 ++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/99119541/clients/src/main/java/org/apache/kafka/clients/Metadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index c8bde8b..07f1cdb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -132,6 +132,15 @@ public final class Metadata { } /** + * Check if a topic is already in the topic set. + * @param topic topic to check + * @return true if the topic exists, false otherwise + */ + public synchronized boolean containsTopic(String topic) { + return this.topics.contains(topic); + } + + /** * Update the cluster metadata */ public synchronized void update(Cluster cluster, long now) { http://git-wip-us.apache.org/repos/asf/kafka/blob/99119541/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index feda9c9..ab26342 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -382,8 +382,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.sender.wakeup(); } return result.future; - // Handling exceptions and record the errors; - // For API exceptions return them in the future, + // handling exceptions and record the errors; + // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); @@ -406,6 +406,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @param maxWaitMs The maximum time in ms for waiting on the metadata */ private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { + // add topic to metadata topic list if it is not there already. + if (!this.metadata.containsTopic(topic)) + this.metadata.add(topic); + if (metadata.fetch().partitionsForTopic(topic) != null) { return; } else { @@ -414,7 +418,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { while (metadata.fetch().partitionsForTopic(topic) == null) { log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate(); - metadata.add(topic); sender.wakeup(); metadata.awaitUpdate(version, remainingWaitMs); long elapsed = time.milliseconds() - begin;
