[ 
https://issues.apache.org/jira/browse/KAFKA-6897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551829#comment-16551829
 ] 

ASF GitHub Bot commented on KAFKA-6897:
---------------------------------------

hachikuji closed pull request #5027: KAFKA-6897: Prevent producer from blocking 
indefinitely after close
URL: https://github.com/apache/kafka/pull/5027
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java 
b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index 8252cf3a9cd..ec007a69668 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -89,4 +89,8 @@ public void handleCompletedMetadataResponse(RequestHeader 
requestHeader, long no
     public void requestUpdate() {
         // Do nothing
     }
+
+    @Override
+    public void close() {
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 91b15875cd0..6c663cfac93 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -25,6 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -48,7 +50,7 @@
  * is removed from the metadata refresh set after an update. Consumers disable 
topic expiry since they explicitly
  * manage topics while producers rely on topic expiry to limit the refresh set.
  */
-public final class Metadata {
+public final class Metadata implements Closeable {
 
     private static final Logger log = LoggerFactory.getLogger(Metadata.class);
 
@@ -70,6 +72,7 @@
     private boolean needMetadataForAllTopics;
     private final boolean allowAutoTopicCreation;
     private final boolean topicExpiryEnabled;
+    private boolean isClosed;
 
     public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean 
allowAutoTopicCreation) {
         this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, 
false, new ClusterResourceListeners());
@@ -100,6 +103,7 @@ public Metadata(long refreshBackoffMs, long 
metadataExpireMs, boolean allowAutoT
         this.listeners = new ArrayList<>();
         this.clusterResourceListeners = clusterResourceListeners;
         this.needMetadataForAllTopics = false;
+        this.isClosed = false;
     }
 
     /**
@@ -164,12 +168,12 @@ public synchronized AuthenticationException 
getAndClearAuthenticationException()
      * Wait for metadata update until the current version is larger than the 
last version we know of
      */
     public synchronized void awaitUpdate(final int lastVersion, final long 
maxWaitMs) throws InterruptedException {
-        if (maxWaitMs < 0) {
+        if (maxWaitMs < 0)
             throw new IllegalArgumentException("Max time to wait for metadata 
updates should not be < 0 milliseconds");
-        }
+
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
-        while (this.version <= lastVersion) {
+        while ((this.version <= lastVersion) && !isClosed()) {
             AuthenticationException ex = getAndClearAuthenticationException();
             if (ex != null)
                 throw ex;
@@ -180,6 +184,8 @@ public synchronized void awaitUpdate(final int lastVersion, 
final long maxWaitMs
                 throw new TimeoutException("Failed to update metadata after " 
+ maxWaitMs + " ms.");
             remainingWaitMs = maxWaitMs - elapsed;
         }
+        if (isClosed())
+            throw new KafkaException("Requested metadata update after close");
     }
 
     /**
@@ -224,6 +230,8 @@ public synchronized boolean containsTopic(String topic) {
      */
     public synchronized void update(Cluster newCluster, Set<String> 
unavailableTopics, long now) {
         Objects.requireNonNull(newCluster, "cluster should not be null");
+        if (isClosed())
+            throw new IllegalStateException("Update requested after metadata 
close");
 
         this.needUpdate = false;
         this.lastRefreshMs = now;
@@ -331,6 +339,25 @@ public synchronized void removeListener(Listener listener) 
{
         this.listeners.remove(listener);
     }
 
+    /**
+     * "Close" this metadata instance to indicate that metadata updates are no 
longer possible. This is typically used
+     * when the thread responsible for performing metadata updates is exiting 
and needs a way to relay this information
+     * to any other thread(s) that could potentially wait on metadata update 
to come through.
+     */
+    @Override
+    public synchronized void close() {
+        this.isClosed = true;
+        this.notifyAll();
+    }
+
+    /**
+     * Check if this metadata instance has been closed. See {@link #close()} 
for more information.
+     * @return True if this instance has been closed; false otherwise
+     */
+    public synchronized boolean isClosed() {
+        return this.isClosed;
+    }
+
     /**
      * MetadataUpdate Listener
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java 
b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 09ed995d14c..de765db5a8d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 
+import java.io.Closeable;
 import java.util.List;
 
 /**
@@ -29,7 +30,7 @@
  * <p>
  * This class is not thread-safe!
  */
-public interface MetadataUpdater {
+public interface MetadataUpdater extends Closeable {
 
     /**
      * Gets the current cluster info without blocking.
@@ -82,4 +83,10 @@
      * start of the update if possible (see `maybeUpdate` for more 
information).
      */
     void requestUpdate();
+
+    /**
+     * Close this updater.
+     */
+    @Override
+    void close();
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 720a7814752..fd16fe608e3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -581,6 +581,7 @@ public void wakeup() {
     @Override
     public void close() {
         this.selector.close();
+        this.metadataUpdater.close();
     }
 
     /**
@@ -981,6 +982,11 @@ public void requestUpdate() {
             this.metadata.requestUpdate();
         }
 
+        @Override
+        public void close() {
+            this.metadata.close();
+        }
+
         /**
          * Return true if there's at least one connection establishment is 
currently underway
          */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 85d3c28e8df..1ad3991ca24 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -118,6 +118,10 @@ public void handleCompletedMetadataResponse(RequestHeader 
requestHeader, long no
         public void requestUpdate() {
             AdminMetadataManager.this.requestUpdate();
         }
+
+        @Override
+        public void close() {
+        }
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3a6717b7676..3519947bf15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -790,12 +790,12 @@ public void abortTransaction() throws 
ProducerFencedException {
      *
      * @throws AuthenticationException if authentication fails. See the 
exception for more details
      * @throws AuthorizationException fatal error indicating that the producer 
is not allowed to write
-     * @throws IllegalStateException if a transactional.id has been configured 
and no transaction has been started
+     * @throws IllegalStateException if a transactional.id has been configured 
and no transaction has been started, or
+     *                               when send is invoked after producer has 
been closed.
      * @throws InterruptException If the thread is interrupted while blocked
      * @throws SerializationException If the key or value are not valid 
objects given the configured serializers
      * @throws TimeoutException If the time taken for fetching metadata or 
allocating memory for the record has surpassed <code>max.block.ms</code>.
      * @throws KafkaException If a Kafka related error occurs that does not 
belong to the public API exceptions.
-     *
      */
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback) {
@@ -804,14 +804,29 @@ public void abortTransaction() throws 
ProducerFencedException {
         return doSend(interceptedRecord, callback);
     }
 
+    // Verify that this producer instance has not been closed. This method 
throws IllegalStateException if the producer
+    // has already been closed.
+    private void throwIfProducerClosed() {
+        if (ioThread == null || !ioThread.isAlive())
+            throw new IllegalStateException("Cannot perform operation after 
producer has been closed");
+    }
+
     /**
      * Implementation of asynchronously send a record to a topic.
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, 
Callback callback) {
         TopicPartition tp = null;
         try {
+            throwIfProducerClosed();
             // first make sure the metadata for the topic is available
-            ClusterAndWaitTime clusterAndWaitTime = 
waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
+            ClusterAndWaitTime clusterAndWaitTime;
+            try {
+                clusterAndWaitTime = waitOnMetadata(record.topic(), 
record.partition(), maxBlockTimeMs);
+            } catch (KafkaException e) {
+                if (metadata.isClosed())
+                    throw new KafkaException("Producer closed while send in 
progress", e);
+                throw e;
+            }
             long remainingWaitMs = Math.max(0, maxBlockTimeMs - 
clusterAndWaitTime.waitedOnMetadataMs);
             Cluster cluster = clusterAndWaitTime.cluster;
             byte[] serializedKey;
@@ -896,6 +911,7 @@ private void setReadOnly(Headers headers) {
      * @param partition A specific partition expected to exist in metadata, or 
null if there's no preference
      * @param maxWaitMs The maximum time in ms for waiting on the metadata
      * @return The cluster containing topic metadata and the amount of time we 
waited in ms
+     * @throws KafkaException for all Kafka-related exceptions, including the 
case where this method is called after producer close
      */
     private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, 
long maxWaitMs) throws InterruptedException {
         // add topic to metadata topic list if it is not there already and 
reset expiry
@@ -1016,8 +1032,9 @@ public void flush() {
      * Get the partition metadata for the given topic. This can be used for 
custom partitioning.
      * @throws AuthenticationException if authentication fails. See the 
exception for more details
      * @throws AuthorizationException if not authorized to the specified 
topic. See the exception for more details
-     * @throws InterruptException If the thread is interrupted while blocked
+     * @throws InterruptException if the thread is interrupted while blocked
      * @throws TimeoutException if metadata could not be refreshed within 
{@code max.block.ms}
+     * @throws KafkaException for all Kafka-related exceptions, including the 
case where this method is called after producer close
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 9e9869a7e48..dc00b473027 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -311,9 +311,6 @@ public void close() {
 
     @Override
     public void close(long timeout, TimeUnit timeUnit) {
-        if (this.closed) {
-            throw new IllegalStateException("MockProducer is already closed.");
-        }
         this.closed = true;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index e2b58448dc9..31c6d754c9d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -195,7 +196,7 @@ public RecordAppendResult append(TopicPartition tp,
             Deque<ProducerBatch> dq = getOrCreateDeque(tp);
             synchronized (dq) {
                 if (closed)
-                    throw new IllegalStateException("Cannot send after the 
producer is closed.");
+                    throw new KafkaException("Producer closed while send in 
progress");
                 RecordAppendResult appendResult = tryAppend(timestamp, key, 
value, headers, callback, dq);
                 if (appendResult != null)
                     return appendResult;
@@ -209,7 +210,7 @@ public RecordAppendResult append(TopicPartition tp,
             synchronized (dq) {
                 // Need to check if producer is closed again after grabbing 
the dequeue lock.
                 if (closed)
-                    throw new IllegalStateException("Cannot send after the 
producer is closed.");
+                    throw new KafkaException("Producer closed while send in 
progress");
 
                 RecordAppendResult appendResult = tryAppend(timestamp, key, 
value, headers, callback, dq);
                 if (appendResult != null) {
@@ -700,7 +701,7 @@ public void abortIncompleteBatches() {
      * Go through incomplete batches and abort them.
      */
     private void abortBatches() {
-        abortBatches(new IllegalStateException("Producer is closed 
forcefully."));
+        abortBatches(new KafkaException("Producer is closed forcefully."));
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java 
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 1188af776aa..969921eceeb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -25,6 +25,7 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -46,7 +47,7 @@
     private long refreshBackoffMs = 100;
     private long metadataExpireMs = 1000;
     private Metadata metadata = new Metadata(refreshBackoffMs, 
metadataExpireMs, true);
-    private AtomicReference<String> backgroundError = new AtomicReference<>();
+    private AtomicReference<Exception> backgroundError = new 
AtomicReference<>();
 
     @After
     public void tearDown() {
@@ -83,6 +84,30 @@ public void testMetadata() throws Exception {
         assertTrue("Update needed due to stale metadata.", 
metadata.timeToNextUpdate(time) == 0);
     }
 
+    @Test
+    public void testMetadataAwaitAfterClose() throws InterruptedException {
+        long time = 0;
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+        metadata.requestUpdate();
+        assertFalse("Still no updated needed due to backoff", 
metadata.timeToNextUpdate(time) == 0);
+        time += refreshBackoffMs;
+        assertTrue("Update needed now that backoff time expired", 
metadata.timeToNextUpdate(time) == 0);
+        String topic = "my-topic";
+        metadata.close();
+        Thread t1 = asyncFetch(topic, 500);
+        t1.join();
+        assertTrue(backgroundError.get().getClass() == KafkaException.class);
+        assertTrue(backgroundError.get().toString().contains("Requested 
metadata update after close"));
+        clearBackgroundError();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMetadataUpdateAfterClose() {
+        metadata.close();
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), 1000);
+    }
+
     private static void checkTimeToNextUpdate(long refreshBackoffMs, long 
metadataExpireMs) {
         long now = 10000;
 
@@ -409,15 +434,18 @@ public void testNonExpiringMetadata() throws Exception {
         assertTrue("Unused topic expired when expiry disabled", 
metadata.containsTopic("topic4"));
     }
 
+    private void clearBackgroundError() {
+        backgroundError.set(null);
+    }
+
     private Thread asyncFetch(final String topic, final long maxWaitMs) {
         Thread thread = new Thread() {
             public void run() {
-                while (metadata.fetch().partitionsForTopic(topic).isEmpty()) {
-                    try {
+                try {
+                    while 
(metadata.fetch().partitionsForTopic(topic).isEmpty())
                         metadata.awaitUpdate(metadata.requestUpdate(), 
maxWaitMs);
-                    } catch (Exception e) {
-                        backgroundError.set(e.toString());
-                    }
+                } catch (Exception e) {
+                    backgroundError.set(e);
                 }
             }
         };
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 0f64f13ef60..6b41a9e8779 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -533,6 +533,7 @@ public ClientRequest newClientRequest(String nodeId,
 
     @Override
     public void close() {
+        metadata.close();
     }
 
     @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index bf03e46ec08..dd2dd896b28 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -632,8 +632,8 @@ public void testSendToInvalidTopic() throws Exception {
         client.setNode(node);
 
         Producer<String, String> producer = new KafkaProducer<>(new 
ProducerConfig(
-            ProducerConfig.addSerializerToConfig(props, new 
StringSerializer(), new StringSerializer())),
-            new StringSerializer(), new StringSerializer(), metadata, client);
+                ProducerConfig.addSerializerToConfig(props, new 
StringSerializer(), new StringSerializer())),
+                new StringSerializer(), new StringSerializer(), metadata, 
client);
 
         String invalidTopicName = "topic abc";          // Invalid topic name 
due to space
         ProducerRecord<String, String> record = new 
ProducerRecord<>(invalidTopicName, "HelloKafka");
@@ -641,12 +641,12 @@ public void testSendToInvalidTopic() throws Exception {
         Set<String> invalidTopic = new HashSet<String>();
         invalidTopic.add(invalidTopicName);
         Cluster metaDataUpdateResponseCluster = new 
Cluster(cluster.clusterResource().clusterId(),
-                                                            cluster.nodes(),
-                                                            new 
ArrayList<PartitionInfo>(0),
-                                                            
Collections.<String>emptySet(),
-                                                            invalidTopic,
-                                                            
cluster.internalTopics(),
-                                                            
cluster.controller());
+                cluster.nodes(),
+                new ArrayList<PartitionInfo>(0),
+                Collections.<String>emptySet(),
+                invalidTopic,
+                cluster.internalTopics(),
+                cluster.controller());
         client.prepareMetadataUpdate(metaDataUpdateResponseCluster, 
Collections.<String>emptySet());
 
         Future<RecordMetadata> future = producer.send(record);
@@ -654,4 +654,51 @@ public void testSendToInvalidTopic() throws Exception {
         assertEquals("Cluster has incorrect invalid topic list.", 
metaDataUpdateResponseCluster.invalidTopics(), 
metadata.fetch().invalidTopics());
         TestUtils.assertFutureError(future, InvalidTopicException.class);
     }
+
+    @Test
+    public void testCloseWhenWaitingForMetadataUpdate() throws 
InterruptedException {
+        Properties props = new Properties();
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MAX_VALUE);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        // Simulate a case where metadata for a particular topic is not 
available. This will cause KafkaProducer#send to
+        // block in Metadata#awaitUpdate for the configured max.block.ms. When 
close() is invoked, KafkaProducer#send should
+        // return with a KafkaException.
+        String topicName = "test";
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster();
+        Node node = cluster.nodes().get(0);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
+        metadata.update(cluster, Collections.<String>emptySet(), 
time.milliseconds());
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        Producer<String, String> producer = new KafkaProducer<>(
+                new ProducerConfig(ProducerConfig.addSerializerToConfig(props, 
new StringSerializer(), new StringSerializer())),
+                new StringSerializer(), new StringSerializer(), metadata, 
client);
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        final AtomicReference<Exception> sendException = new 
AtomicReference<>();
+
+        try {
+            executor.submit(() -> {
+                try {
+                    // Metadata for topic "test" will not be available which 
will cause us to block indefinitely until
+                    // KafkaProducer#close is invoked.
+                    producer.send(new ProducerRecord<>(topicName, "key", 
"value"));
+                    fail();
+                } catch (Exception e) {
+                    sendException.set(e);
+                }
+            });
+
+            // Wait until metadata update for the topic has been requested
+            TestUtils.waitForCondition(() -> 
metadata.containsTopic(topicName), "Timeout when waiting for topic to be added 
to metadata");
+            producer.close(0, TimeUnit.MILLISECONDS);
+            TestUtils.waitForCondition(() -> sendException.get() != null, "No 
producer exception within timeout");
+            assertEquals(KafkaException.class, sendException.get().getClass());
+        } finally {
+            executor.shutdownNow();
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 27fac280afc..7a8c710b76b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -636,16 +636,6 @@ public void 
shouldThrowOnAbortTransactionIfProducerIsClosed() {
         } catch (IllegalStateException e) { }
     }
 
-    @Test
-    public void shouldThrowOnCloseIfProducerIsClosed() {
-        buildMockProducer(true);
-        producer.close();
-        try {
-            producer.close();
-            fail("Should have thrown as producer is already closed");
-        } catch (IllegalStateException e) { }
-    }
-
     @Test
     public void shouldThrowOnFenceProducerIfProducerIsClosed() {
         buildMockProducer(true);
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 92396a7bdab..9cc6ebe1c86 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -31,7 +31,7 @@ import kafka.utils.{CommandLineUtils, CoreUtils, Logging, 
Whitelist}
 import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, 
ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, 
OffsetAndMetadata}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord, RecordMetadata}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.WakeupException
@@ -357,6 +357,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
               trace("Caught NoRecordsException, continue iteration.")
             case _: WakeupException =>
               trace("Caught WakeupException, continue iteration.")
+            case e: KafkaException if (shuttingDown || exitingOnSendFailure) =>
+              trace(s"Ignoring caught KafkaException during shutdown. 
sendFailure: $exitingOnSendFailure.", e)
           }
           maybeFlushAndCommitOffsets()
         }
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index ee0e90f1807..dc4041f1d63 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -35,6 +35,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 import scala.collection.mutable.{ArrayBuffer, Buffer}
+import scala.concurrent.ExecutionException
 
 abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
@@ -446,8 +447,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
           future.get()
           fail("No message should be sent successfully.")
         } catch {
-          case e: Exception =>
-            assertEquals("java.lang.IllegalStateException: Producer is closed 
forcefully.", e.getMessage)
+          case e: ExecutionException => assertEquals(classOf[KafkaException], 
e.getCause.getClass)
         }
       }
       assertEquals("Fetch response should have no message returned.", 0, 
consumer.poll(50).count)
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 7969485d81b..9b77c2d4169 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -205,7 +205,7 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     // create topic
     createTopic(topic1, replicationFactor = numServers)
 
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, 
"key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, 
"key".getBytes, "value".getBytes)
 
     // first send a message to make sure the metadata is refreshed
     producer1.send(record).get


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mirrormaker waits to shut down forever on produce failure with 
> abort.on.send.failure=true 
> ------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6897
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6897
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Koelli Mungee
>            Assignee: Dhruvil Shah
>            Priority: Major
>             Fix For: 2.1.0
>
>         Attachments: mirror_maker_thread_dump.log
>
>
> Mirrormaker never shuts down after a produce failure when 
> abort.on.send.failure=true
> {code:java}
> [2018-05-07 08:29:32,417] ERROR Error when sending message to topic test with 
> key: 52 bytes, value: 32615 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> test-11: 45646 ms has passed since last append
> [2018-05-07 08:29:32,434] INFO Closing producer due to send failure. 
> (kafka.tools.MirrorMaker$)
> [2018-05-07 08:29:32,434] INFO [Producer clientId=producer-1] Closing the 
> Kafka producer with timeoutMillis = 0 ms. 
> (org.apache.kafka.clients.producer.KafkaProducer)
> {code}
> A stack trace of this mirrormaker process 9 hours later shows that the main 
> thread is still active and it is waiting for metadata that it will never get 
> since the producer send thread is no longer running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to