Repository: kafka Updated Branches: refs/heads/trunk b512cd474 -> 38c5d7fba
KAFKA-5936; KafkaProducer.close should throw InterruptException Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Apurva Mehta <apu...@confluent.io>, Guozhang Wang <wangg...@gmail.com>, Jason Gustafson <ja...@confluent.io> Closes #3912 from mjsax/kafka-5936-producer-close Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/38c5d7fb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/38c5d7fb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/38c5d7fb Branch: refs/heads/trunk Commit: 38c5d7fba7387b797a10c9c6ed71bf99c6d417bc Parents: b512cd4 Author: Matthias J. Sax <matth...@confluent.io> Authored: Wed Nov 29 16:22:07 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Nov 29 16:22:07 2017 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/clients/ClientUtils.java | 18 ++--- .../kafka/clients/producer/KafkaProducer.java | 41 +++++++---- .../clients/consumer/KafkaConsumerTest.java | 23 +++--- .../clients/producer/KafkaProducerTest.java | 76 +++++++++++++++++++- 4 files changed, 123 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/38c5d7fb/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index ea4c4db..2a9b763 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -16,22 +16,22 @@ */ package org.apache.kafka.clients; -import java.io.Closeable; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.network.ChannelBuilder; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.SaslConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; http://git-wip-us.apache.org/repos/asf/kafka/blob/38c5d7fb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b3cff19..1fec744 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -265,7 +266,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * */ public KafkaProducer(Map<String, Object> configs) { - this(new ProducerConfig(configs), null, null); + this(new ProducerConfig(configs), null, null, null, null); } /** @@ -281,7 +282,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { */ public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) { this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)), - keySerializer, valueSerializer); + keySerializer, valueSerializer, null, null); } /** @@ -290,7 +291,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @param properties The producer configs */ public KafkaProducer(Properties properties) { - this(new ProducerConfig(properties), null, null); + this(new ProducerConfig(properties), null, null, null, null); } /** @@ -304,11 +305,16 @@ public class KafkaProducer<K, V> implements Producer<K, V> { */ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) { this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)), - keySerializer, valueSerializer); + keySerializer, valueSerializer, null, null); } @SuppressWarnings("unchecked") - private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { + // visible for testing + KafkaProducer(ProducerConfig config, + Serializer<K> keySerializer, + Serializer<V> valueSerializer, + Metadata metadata, + KafkaClient kafkaClient) { try { Map<String, Object> userProvidedConfigs = config.originals(); this.producerConfig = config; @@ -363,8 +369,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { ProducerInterceptor.class); this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), - true, true, clusterResourceListeners); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); @@ -388,10 +392,16 @@ public class KafkaProducer<K, V> implements Producer<K, V> { apiVersions, transactionManager); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds()); + if (metadata != null) { + this.metadata = metadata; + } else { + this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), + true, true, clusterResourceListeners); + this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds()); + } ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); - NetworkClient client = new NetworkClient( + KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder, logContext), this.metadata, @@ -1051,7 +1061,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { try { this.ioThread.join(timeUnit.toMillis(timeout)); } catch (InterruptedException t) { - firstException.compareAndSet(null, t); + firstException.compareAndSet(null, new InterruptException(t)); log.error("Interrupted while joining ioThread", t); } } @@ -1067,7 +1077,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { try { this.ioThread.join(); } catch (InterruptedException e) { - firstException.compareAndSet(null, e); + firstException.compareAndSet(null, new InterruptException(e)); } } } @@ -1079,8 +1089,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> { ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException); AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka producer has been closed"); - if (firstException.get() != null && !swallowException) - throw new KafkaException("Failed to close kafka producer", firstException.get()); + Throwable exception = firstException.get(); + if (exception != null && !swallowException) { + if (exception instanceof InterruptException) { + throw (InterruptException) exception; + } + throw new KafkaException("Failed to close kafka producer", exception); + } } private ClusterResourceListeners configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V> valueSerializer, List<?>... candidateLists) { http://git-wip-us.apache.org/repos/asf/kafka/blob/38c5d7fb/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 12254c9..ab682d6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -68,6 +68,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.MockConsumerInterceptor; import org.apache.kafka.test.MockMetricsReporter; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.Assert; import org.junit.Rule; @@ -1365,7 +1366,7 @@ public class KafkaConsumerTest { // Kafka consumer is single-threaded, but the implementation allows calls on a // different thread as long as the calls are not executed concurrently. So this is safe. ExecutorService executor = Executors.newSingleThreadExecutor(); - final AtomicReference<Exception> closeException = new AtomicReference<Exception>(); + final AtomicReference<Exception> closeException = new AtomicReference<>(); try { Future<?> future = executor.submit(new Runnable() { @Override @@ -1409,21 +1410,21 @@ public class KafkaConsumerTest { if (waitMs > 0) time.sleep(waitMs); - if (interrupt) + if (interrupt) { assertTrue("Close terminated prematurely", future.cancel(true)); - // Make sure that close task completes and another task can be run on the single threaded executor - executor.submit(new Runnable() { - @Override - public void run() { - } - }).get(500, TimeUnit.MILLISECONDS); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return closeException.get() != null; + } + }, "InterruptException did not occur within timeout."); - if (!interrupt) { + assertTrue("Expected exception not thrown " + closeException, closeException.get() instanceof InterruptException); + } else { future.get(500, TimeUnit.MILLISECONDS); // Should succeed without TimeoutException or ExecutionException assertNull("Unexpected exception during close", closeException.get()); - } else - assertTrue("Expected exception not thrown " + closeException, closeException.get() instanceof InterruptException); + } } finally { executor.shutdownNow(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/38c5d7fb/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 26f7588..9f70fd7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -25,9 +26,10 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -36,9 +38,11 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.MockMetricsReporter; +import org.apache.kafka.test.MockPartitioner; import org.apache.kafka.test.MockProducerInterceptor; import org.apache.kafka.test.MockSerializer; -import org.apache.kafka.test.MockPartitioner; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -55,6 +59,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -147,6 +156,7 @@ public class KafkaProducerTest { try { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + MockPartitioner.resetCounters(); props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>( @@ -164,6 +174,68 @@ public class KafkaProducerTest { } @Test + public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName()); + props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "1"); + + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster("topic", 1); + Node node = cluster.nodes().get(0); + + Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + + final Producer<String, String> producer = new KafkaProducer<>( + new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + final AtomicReference<Exception> closeException = new AtomicReference<>(); + try { + Future<?> future = executor.submit(new Runnable() { + @Override + public void run() { + producer.send(new ProducerRecord<>("topic", "key", "value")); + try { + producer.close(); + fail("Close should block and throw."); + } catch (Exception e) { + closeException.set(e); + } + } + }); + + // Close producer should not complete until send succeeds + try { + future.get(100, TimeUnit.MILLISECONDS); + fail("Close completed without waiting for send"); + } catch (java.util.concurrent.TimeoutException expected) { /* ignore */ } + + // Ensure send has started + client.waitForRequests(1, 1000); + + assertTrue("Close terminated prematurely", future.cancel(true)); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return closeException.get() != null; + } + }, "InterruptException did not occur within timeout."); + + assertTrue("Expected exception not thrown " + closeException, closeException.get() instanceof InterruptException); + } finally { + executor.shutdownNow(); + } + + } + + @Test public void testOsDefaultSocketBufferSizes() throws Exception { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");