[ https://issues.apache.org/jira/browse/KAFKA-6897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551829#comment-16551829 ]
ASF GitHub Bot commented on KAFKA-6897: --------------------------------------- hachikuji closed pull request #5027: KAFKA-6897: Prevent producer from blocking indefinitely after close URL: https://github.com/apache/kafka/pull/5027 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java index 8252cf3a9cd..ec007a69668 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java @@ -89,4 +89,8 @@ public void handleCompletedMetadataResponse(RequestHeader requestHeader, long no public void requestUpdate() { // Do nothing } + + @Override + public void close() { + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 91b15875cd0..6c663cfac93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -25,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -48,7 +50,7 @@ * is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly * manage topics while producers rely on topic expiry to limit the refresh set. */ -public final class Metadata { +public final class Metadata implements Closeable { private static final Logger log = LoggerFactory.getLogger(Metadata.class); @@ -70,6 +72,7 @@ private boolean needMetadataForAllTopics; private final boolean allowAutoTopicCreation; private final boolean topicExpiryEnabled; + private boolean isClosed; public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) { this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners()); @@ -100,6 +103,7 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoT this.listeners = new ArrayList<>(); this.clusterResourceListeners = clusterResourceListeners; this.needMetadataForAllTopics = false; + this.isClosed = false; } /** @@ -164,12 +168,12 @@ public synchronized AuthenticationException getAndClearAuthenticationException() * Wait for metadata update until the current version is larger than the last version we know of */ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { - if (maxWaitMs < 0) { + if (maxWaitMs < 0) throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds"); - } + long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; - while (this.version <= lastVersion) { + while ((this.version <= lastVersion) && !isClosed()) { AuthenticationException ex = getAndClearAuthenticationException(); if (ex != null) throw ex; @@ -180,6 +184,8 @@ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; } + if (isClosed()) + throw new KafkaException("Requested metadata update after close"); } /** @@ -224,6 +230,8 @@ public synchronized boolean containsTopic(String topic) { */ public synchronized void update(Cluster newCluster, Set<String> unavailableTopics, long now) { Objects.requireNonNull(newCluster, "cluster should not be null"); + if (isClosed()) + throw new IllegalStateException("Update requested after metadata close"); this.needUpdate = false; this.lastRefreshMs = now; @@ -331,6 +339,25 @@ public synchronized void removeListener(Listener listener) { this.listeners.remove(listener); } + /** + * "Close" this metadata instance to indicate that metadata updates are no longer possible. This is typically used + * when the thread responsible for performing metadata updates is exiting and needs a way to relay this information + * to any other thread(s) that could potentially wait on metadata update to come through. + */ + @Override + public synchronized void close() { + this.isClosed = true; + this.notifyAll(); + } + + /** + * Check if this metadata instance has been closed. See {@link #close()} for more information. + * @return True if this instance has been closed; false otherwise + */ + public synchronized boolean isClosed() { + return this.isClosed; + } + /** * MetadataUpdate Listener */ diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java index 09ed995d14c..de765db5a8d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; +import java.io.Closeable; import java.util.List; /** @@ -29,7 +30,7 @@ * <p> * This class is not thread-safe! */ -public interface MetadataUpdater { +public interface MetadataUpdater extends Closeable { /** * Gets the current cluster info without blocking. @@ -82,4 +83,10 @@ * start of the update if possible (see `maybeUpdate` for more information). */ void requestUpdate(); + + /** + * Close this updater. + */ + @Override + void close(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 720a7814752..fd16fe608e3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -581,6 +581,7 @@ public void wakeup() { @Override public void close() { this.selector.close(); + this.metadataUpdater.close(); } /** @@ -981,6 +982,11 @@ public void requestUpdate() { this.metadata.requestUpdate(); } + @Override + public void close() { + this.metadata.close(); + } + /** * Return true if there's at least one connection establishment is currently underway */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java index 85d3c28e8df..1ad3991ca24 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java @@ -118,6 +118,10 @@ public void handleCompletedMetadataResponse(RequestHeader requestHeader, long no public void requestUpdate() { AdminMetadataManager.this.requestUpdate(); } + + @Override + public void close() { + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3a6717b7676..3519947bf15 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -790,12 +790,12 @@ public void abortTransaction() throws ProducerFencedException { * * @throws AuthenticationException if authentication fails. See the exception for more details * @throws AuthorizationException fatal error indicating that the producer is not allowed to write - * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started + * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or + * when send is invoked after producer has been closed. * @throws InterruptException If the thread is interrupted while blocked * @throws SerializationException If the key or value are not valid objects given the configured serializers * @throws TimeoutException If the time taken for fetching metadata or allocating memory for the record has surpassed <code>max.block.ms</code>. * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. - * */ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { @@ -804,14 +804,29 @@ public void abortTransaction() throws ProducerFencedException { return doSend(interceptedRecord, callback); } + // Verify that this producer instance has not been closed. This method throws IllegalStateException if the producer + // has already been closed. + private void throwIfProducerClosed() { + if (ioThread == null || !ioThread.isAlive()) + throw new IllegalStateException("Cannot perform operation after producer has been closed"); + } + /** * Implementation of asynchronously send a record to a topic. */ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { + throwIfProducerClosed(); // first make sure the metadata for the topic is available - ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); + ClusterAndWaitTime clusterAndWaitTime; + try { + clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); + } catch (KafkaException e) { + if (metadata.isClosed()) + throw new KafkaException("Producer closed while send in progress", e); + throw e; + } long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; @@ -896,6 +911,7 @@ private void setReadOnly(Headers headers) { * @param partition A specific partition expected to exist in metadata, or null if there's no preference * @param maxWaitMs The maximum time in ms for waiting on the metadata * @return The cluster containing topic metadata and the amount of time we waited in ms + * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close */ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException { // add topic to metadata topic list if it is not there already and reset expiry @@ -1016,8 +1032,9 @@ public void flush() { * Get the partition metadata for the given topic. This can be used for custom partitioning. * @throws AuthenticationException if authentication fails. See the exception for more details * @throws AuthorizationException if not authorized to the specified topic. See the exception for more details - * @throws InterruptException If the thread is interrupted while blocked + * @throws InterruptException if the thread is interrupted while blocked * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms} + * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close */ @Override public List<PartitionInfo> partitionsFor(String topic) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 9e9869a7e48..dc00b473027 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -311,9 +311,6 @@ public void close() { @Override public void close(long timeout, TimeUnit timeUnit) { - if (this.closed) { - throw new IllegalStateException("MockProducer is already closed."); - } this.closed = true; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index e2b58448dc9..31c6d754c9d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -195,7 +196,7 @@ public RecordAppendResult append(TopicPartition tp, Deque<ProducerBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) - throw new IllegalStateException("Cannot send after the producer is closed."); + throw new KafkaException("Producer closed while send in progress"); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; @@ -209,7 +210,7 @@ public RecordAppendResult append(TopicPartition tp, synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) - throw new IllegalStateException("Cannot send after the producer is closed."); + throw new KafkaException("Producer closed while send in progress"); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { @@ -700,7 +701,7 @@ public void abortIncompleteBatches() { * Go through incomplete batches and abort them. */ private void abortBatches() { - abortBatches(new IllegalStateException("Producer is closed forcefully.")); + abortBatches(new KafkaException("Producer is closed forcefully.")); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 1188af776aa..969921eceeb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -46,7 +47,7 @@ private long refreshBackoffMs = 100; private long metadataExpireMs = 1000; private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true); - private AtomicReference<String> backgroundError = new AtomicReference<>(); + private AtomicReference<Exception> backgroundError = new AtomicReference<>(); @After public void tearDown() { @@ -83,6 +84,30 @@ public void testMetadata() throws Exception { assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0); } + @Test + public void testMetadataAwaitAfterClose() throws InterruptedException { + long time = 0; + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); + assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); + metadata.requestUpdate(); + assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0); + time += refreshBackoffMs; + assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0); + String topic = "my-topic"; + metadata.close(); + Thread t1 = asyncFetch(topic, 500); + t1.join(); + assertTrue(backgroundError.get().getClass() == KafkaException.class); + assertTrue(backgroundError.get().toString().contains("Requested metadata update after close")); + clearBackgroundError(); + } + + @Test(expected = IllegalStateException.class) + public void testMetadataUpdateAfterClose() { + metadata.close(); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), 1000); + } + private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataExpireMs) { long now = 10000; @@ -409,15 +434,18 @@ public void testNonExpiringMetadata() throws Exception { assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4")); } + private void clearBackgroundError() { + backgroundError.set(null); + } + private Thread asyncFetch(final String topic, final long maxWaitMs) { Thread thread = new Thread() { public void run() { - while (metadata.fetch().partitionsForTopic(topic).isEmpty()) { - try { + try { + while (metadata.fetch().partitionsForTopic(topic).isEmpty()) metadata.awaitUpdate(metadata.requestUpdate(), maxWaitMs); - } catch (Exception e) { - backgroundError.set(e.toString()); - } + } catch (Exception e) { + backgroundError.set(e); } } }; diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 0f64f13ef60..6b41a9e8779 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -533,6 +533,7 @@ public ClientRequest newClientRequest(String nodeId, @Override public void close() { + metadata.close(); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index bf03e46ec08..dd2dd896b28 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -632,8 +632,8 @@ public void testSendToInvalidTopic() throws Exception { client.setNode(node); Producer<String, String> producer = new KafkaProducer<>(new ProducerConfig( - ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), - new StringSerializer(), new StringSerializer(), metadata, client); + ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); String invalidTopicName = "topic abc"; // Invalid topic name due to space ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka"); @@ -641,12 +641,12 @@ public void testSendToInvalidTopic() throws Exception { Set<String> invalidTopic = new HashSet<String>(); invalidTopic.add(invalidTopicName); Cluster metaDataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(), - cluster.nodes(), - new ArrayList<PartitionInfo>(0), - Collections.<String>emptySet(), - invalidTopic, - cluster.internalTopics(), - cluster.controller()); + cluster.nodes(), + new ArrayList<PartitionInfo>(0), + Collections.<String>emptySet(), + invalidTopic, + cluster.internalTopics(), + cluster.controller()); client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.<String>emptySet()); Future<RecordMetadata> future = producer.send(record); @@ -654,4 +654,51 @@ public void testSendToInvalidTopic() throws Exception { assertEquals("Cluster has incorrect invalid topic list.", metaDataUpdateResponseCluster.invalidTopics(), metadata.fetch().invalidTopics()); TestUtils.assertFutureError(future, InvalidTopicException.class); } + + @Test + public void testCloseWhenWaitingForMetadataUpdate() throws InterruptedException { + Properties props = new Properties(); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MAX_VALUE); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + + // Simulate a case where metadata for a particular topic is not available. This will cause KafkaProducer#send to + // block in Metadata#awaitUpdate for the configured max.block.ms. When close() is invoked, KafkaProducer#send should + // return with a KafkaException. + String topicName = "test"; + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster(); + Node node = cluster.nodes().get(0); + Metadata metadata = new Metadata(0, Long.MAX_VALUE, false); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); + MockClient client = new MockClient(time, metadata); + client.setNode(node); + + Producer<String, String> producer = new KafkaProducer<>( + new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + final AtomicReference<Exception> sendException = new AtomicReference<>(); + + try { + executor.submit(() -> { + try { + // Metadata for topic "test" will not be available which will cause us to block indefinitely until + // KafkaProducer#close is invoked. + producer.send(new ProducerRecord<>(topicName, "key", "value")); + fail(); + } catch (Exception e) { + sendException.set(e); + } + }); + + // Wait until metadata update for the topic has been requested + TestUtils.waitForCondition(() -> metadata.containsTopic(topicName), "Timeout when waiting for topic to be added to metadata"); + producer.close(0, TimeUnit.MILLISECONDS); + TestUtils.waitForCondition(() -> sendException.get() != null, "No producer exception within timeout"); + assertEquals(KafkaException.class, sendException.get().getClass()); + } finally { + executor.shutdownNow(); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 27fac280afc..7a8c710b76b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -636,16 +636,6 @@ public void shouldThrowOnAbortTransactionIfProducerIsClosed() { } catch (IllegalStateException e) { } } - @Test - public void shouldThrowOnCloseIfProducerIsClosed() { - buildMockProducer(true); - producer.close(); - try { - producer.close(); - fail("Should have thrown as producer is already closed"); - } catch (IllegalStateException e) { } - } - @Test public void shouldThrowOnFenceProducerIfProducerIsClosed() { buildMockProducer(true); diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 92396a7bdab..9cc6ebe1c86 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -31,7 +31,7 @@ import kafka.utils.{CommandLineUtils, CoreUtils, Logging, Whitelist} import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.errors.WakeupException @@ -357,6 +357,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { trace("Caught NoRecordsException, continue iteration.") case _: WakeupException => trace("Caught WakeupException, continue iteration.") + case e: KafkaException if (shuttingDown || exitingOnSendFailure) => + trace(s"Ignoring caught KafkaException during shutdown. sendFailure: $exitingOnSendFailure.", e) } maybeFlushAndCommitOffsets() } diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index ee0e90f1807..dc4041f1d63 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -35,6 +35,7 @@ import org.junit.Assert._ import org.junit.{After, Before, Test} import scala.collection.mutable.{ArrayBuffer, Buffer} +import scala.concurrent.ExecutionException abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -446,8 +447,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { future.get() fail("No message should be sent successfully.") } catch { - case e: Exception => - assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage) + case e: ExecutionException => assertEquals(classOf[KafkaException], e.getCause.getClass) } } assertEquals("Fetch response should have no message returned.", 0, consumer.poll(50).count) diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 7969485d81b..9b77c2d4169 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -205,7 +205,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { // create topic createTopic(topic1, replicationFactor = numServers) - val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) // first send a message to make sure the metadata is refreshed producer1.send(record).get ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mirrormaker waits to shut down forever on produce failure with > abort.on.send.failure=true > ------------------------------------------------------------------------------------------ > > Key: KAFKA-6897 > URL: https://issues.apache.org/jira/browse/KAFKA-6897 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.1.0 > Reporter: Koelli Mungee > Assignee: Dhruvil Shah > Priority: Major > Fix For: 2.1.0 > > Attachments: mirror_maker_thread_dump.log > > > Mirrormaker never shuts down after a produce failure when > abort.on.send.failure=true > {code:java} > [2018-05-07 08:29:32,417] ERROR Error when sending message to topic test with > key: 52 bytes, value: 32615 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > test-11: 45646 ms has passed since last append > [2018-05-07 08:29:32,434] INFO Closing producer due to send failure. > (kafka.tools.MirrorMaker$) > [2018-05-07 08:29:32,434] INFO [Producer clientId=producer-1] Closing the > Kafka producer with timeoutMillis = 0 ms. > (org.apache.kafka.clients.producer.KafkaProducer) > {code} > A stack trace of this mirrormaker process 9 hours later shows that the main > thread is still active and it is waiting for metadata that it will never get > since the producer send thread is no longer running. -- This message was sent by Atlassian JIRA (v7.6.3#76005)