This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e325676 KAFKA-6833; Producer should await metadata for unknown
partitions (#6073)
e325676 is described below
commit e32567699451e2fb0ccd63dd8a3df582cfd18d61
Author: Bob Barrett <[email protected]>
AuthorDate: Tue Jan 8 21:05:13 2019 -0500
KAFKA-6833; Producer should await metadata for unknown partitions (#6073)
This patch changes the behavior of KafkaProducer.waitOnMetadata to wait up
to max.block.ms when the partition specified in the produce request is out of
the range of partitions present in the metadata. This improves the user
experience in the case when partitions are added to a topic and a client
attempts to produce to one of the new partitions before the metadata has
propagated to the brokers. Tested with unit tests.
Reviewers: Arjun Satish <[email protected]>, Jason Gustafson
<[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 31 ++--
.../kafka/clients/producer/KafkaProducerTest.java | 182 +++++++++++++--------
.../kafka/api/BaseProducerSendTest.scala | 20 ++-
.../kafka/api/ProducerFailureHandlingTest.scala | 14 +-
4 files changed, 153 insertions(+), 94 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 85ed9f8..540493d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -966,12 +966,15 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
- // Issue metadata requests until we have metadata for the topic or
maxWaitTimeMs is exceeded.
- // In case we already have cached metadata for the topic, but the
requested partition is greater
- // than expected, issue an update request only once. This is necessary
in case the metadata
+ // Issue metadata requests until we have metadata for the topic and
the requested partition,
+ // or until maxWaitTimeMs is exceeded. This is necessary in case the
metadata
// is stale and the number of partitions for this topic has increased
in the meantime.
do {
- log.trace("Requesting metadata update for topic {}.", topic);
+ if (partition != null) {
+ log.trace("Requesting metadata update for partition {} of
topic {}.", partition, topic);
+ } else {
+ log.trace("Requesting metadata update for topic {}.", topic);
+ }
metadata.add(topic);
int version = metadata.requestUpdate();
sender.wakeup();
@@ -979,24 +982,26 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging
exception with remainingWaitMs
- throw new TimeoutException("Failed to update metadata after "
+ maxWaitMs + " ms.");
+ throw new TimeoutException(
+ String.format("Topic %s not present in metadata after
%d ms.",
+ topic, maxWaitMs));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
- if (elapsed >= maxWaitMs)
- throw new TimeoutException("Failed to update metadata after "
+ maxWaitMs + " ms.");
+ if (elapsed >= maxWaitMs) {
+ throw new TimeoutException(partitionsCount == null ?
+ String.format("Topic %s not present in metadata after
%d ms.",
+ topic, maxWaitMs) :
+ String.format("Partition %d of topic %s with partition
count %d is not present in metadata after %d ms.",
+ partition, topic, partitionsCount, maxWaitMs));
+ }
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
- } while (partitionsCount == null);
-
- if (partition != null && partition >= partitionsCount) {
- throw new KafkaException(
- String.format("Invalid partition given with record: %d is
not in the range [0...%d).", partition, partitionsCount));
- }
+ } while (partitionsCount == null || (partition != null && partition >=
partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 872d390..a73ab7c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -65,9 +66,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.assertArrayEquals;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -82,6 +83,27 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class KafkaProducerTest {
+ private String topic = "topic";
+ private Collection<Node> nodes = Collections.singletonList(new Node(0,
"host1", 1000));
+ private final Cluster emptyCluster = new Cluster(null, nodes,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet());
+ private final Cluster onePartitionCluster = new Cluster(
+ "dummy",
+ Collections.singletonList(new Node(0, "host1", 1000)),
+ Collections.singletonList(new PartitionInfo(topic, 0, null, null,
null)),
+ Collections.emptySet(),
+ Collections.emptySet());
+ private final Cluster threePartitionCluster = new Cluster(
+ "dummy",
+ Collections.singletonList(new Node(0, "host1", 1000)),
+ Arrays.asList(
+ new PartitionInfo(topic, 0, null, null, null),
+ new PartitionInfo(topic, 1, null, null, null),
+ new PartitionInfo(topic, 2, null, null, null)),
+ Collections.emptySet(),
+ Collections.emptySet());
@Test
public void testMetricsReporterAutoGeneratedClientId() {
@@ -289,22 +311,10 @@ public class KafkaProducerTest {
public void testMetadataFetch() throws InterruptedException {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- String topic = "topic";
- Collection<Node> nodes = Collections.singletonList(new Node(0,
"host1", 1000));
- final Cluster emptyCluster = new Cluster(null, nodes,
- Collections.emptySet(),
- Collections.emptySet(),
- Collections.emptySet());
- final Cluster cluster = new Cluster(
- "dummy",
- Collections.singletonList(new Node(0, "host1", 1000)),
- Collections.singletonList(new PartitionInfo(topic, 0, null,
null, null)),
- Collections.emptySet(),
- Collections.emptySet());
Metadata metadata = mock(Metadata.class);
// Return empty cluster 4 times and cluster from then on
- when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster,
emptyCluster, emptyCluster, cluster);
+ when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster,
emptyCluster, emptyCluster, onePartitionCluster);
KafkaProducer<String, String> producer = new KafkaProducer<String,
String>(configs, new StringSerializer(),
new StringSerializer(), metadata, new MockClient(Time.SYSTEM,
metadata), null, Time.SYSTEM) {
@@ -338,91 +348,127 @@ public class KafkaProducerTest {
}
@Test
- public void testMetadataFetchOnStaleMetadata() throws Exception {
+ public void testMetadataTimeoutWithMissingTopic() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- String topic = "topic";
- ProducerRecord<String, String> initialRecord = new
ProducerRecord<>(topic, "value");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
// Create a record with a partition higher than the initial (outdated)
partition range
- ProducerRecord<String, String> extendedRecord = new
ProducerRecord<>(topic, 2, null, "value");
- Collection<Node> nodes = Collections.singletonList(new Node(0,
"host1", 1000));
- final Cluster emptyCluster = new Cluster(null, nodes,
- Collections.emptySet(),
- Collections.emptySet(),
- Collections.emptySet());
- final Cluster initialCluster = new Cluster(
- "dummy",
- Collections.singletonList(new Node(0, "host1", 1000)),
- Collections.singletonList(new PartitionInfo(topic, 0, null,
null, null)),
- Collections.emptySet(),
- Collections.emptySet());
- final Cluster extendedCluster = new Cluster(
- "dummy",
- Collections.singletonList(new Node(0, "host1", 1000)),
- Arrays.asList(
- new PartitionInfo(topic, 0, null, null, null),
- new PartitionInfo(topic, 1, null, null, null),
- new PartitionInfo(topic, 2, null, null, null)),
- Collections.emptySet(),
- Collections.emptySet());
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2,
null, "value");
Metadata metadata = mock(Metadata.class);
+ MockTime mockTime = new MockTime();
AtomicInteger invocationCount = new AtomicInteger(0);
-
- // Return empty cluster 4 times, initialCluster 5 times and
extendedCluster after that
when(metadata.fetch()).then(invocation -> {
invocationCount.incrementAndGet();
- if (invocationCount.get() > 9)
- return extendedCluster;
- else if (invocationCount.get() > 4)
- return initialCluster;
+ if (invocationCount.get() == 5) {
+ mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
+ }
+
return emptyCluster;
});
KafkaProducer<String, String> producer = new KafkaProducer<String,
String>(configs, new StringSerializer(),
- new StringSerializer(), metadata, new MockClient(Time.SYSTEM,
metadata), null, Time.SYSTEM) {
+ new StringSerializer(), metadata, new MockClient(Time.SYSTEM,
metadata), null, mockTime) {
@Override
Sender newSender(LogContext logContext, KafkaClient kafkaClient,
Metadata metadata) {
// give Sender its own Metadata instance so that we can
isolate Metadata calls from KafkaProducer
return super.newSender(logContext, kafkaClient, new
Metadata(0, 100_000, true));
}
};
- producer.send(initialRecord);
- // One request update for each empty cluster returned
+ // Four request updates where the topic isn't present, at which point
the timeout expires and a
+ // TimeoutException is thrown
+ Future future = producer.send(record);
verify(metadata, times(4)).requestUpdate();
verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
verify(metadata, times(5)).fetch();
-
- // Should not request update if metadata is available and records are
within range
- producer.send(initialRecord);
- verify(metadata, times(4)).requestUpdate();
- verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
- verify(metadata, times(6)).fetch();
-
- // One request update followed by exception if topic metadata is
available but metadata response still returns
- // the same partition size (either because metadata are still stale at
the broker too or because
- // there weren't any partitions added in the first place)
try {
- producer.send(extendedRecord);
- fail("Expected KafkaException to be raised");
- } catch (KafkaException e) {
- // expected
+ future.get();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ } finally {
+ producer.close(0, TimeUnit.MILLISECONDS);
}
- verify(metadata, times(5)).requestUpdate();
- verify(metadata, times(5)).awaitUpdate(anyInt(), anyLong());
- verify(metadata, times(8)).fetch();
+ }
+ @Test
+ public void testMetadataWithPartitionOutOfRange() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+ // Create a record with a partition higher than the initial (outdated)
partition range
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2,
null, "value");
+ Metadata metadata = mock(Metadata.class);
+
+ MockTime mockTime = new MockTime();
+
+ when(metadata.fetch()).thenReturn(onePartitionCluster,
onePartitionCluster, threePartitionCluster);
+
+ KafkaProducer<String, String> producer = new KafkaProducer<String,
String>(configs, new StringSerializer(),
+ new StringSerializer(), metadata, new MockClient(Time.SYSTEM,
metadata), null, mockTime) {
+ @Override
+ Sender newSender(LogContext logContext, KafkaClient kafkaClient,
Metadata metadata) {
+ // give Sender its own Metadata instance so that we can
isolate Metadata calls from KafkaProducer
+ return super.newSender(logContext, kafkaClient, new
Metadata(0, 100_000, true));
+ }
+ };
// One request update if metadata is available but outdated for the
given record
- producer.send(extendedRecord);
- verify(metadata, times(6)).requestUpdate();
- verify(metadata, times(6)).awaitUpdate(anyInt(), anyLong());
- verify(metadata, times(10)).fetch();
+ producer.send(record);
+ verify(metadata, times(2)).requestUpdate();
+ verify(metadata, times(2)).awaitUpdate(anyInt(), anyLong());
+ verify(metadata, times(3)).fetch();
producer.close(0, TimeUnit.MILLISECONDS);
}
@Test
+ public void testMetadataTimeoutWithPartitionOutOfRange() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+ // Create a record with a partition higher than the initial (outdated)
partition range
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2,
null, "value");
+ Metadata metadata = mock(Metadata.class);
+
+ MockTime mockTime = new MockTime();
+ AtomicInteger invocationCount = new AtomicInteger(0);
+ when(metadata.fetch()).then(invocation -> {
+ invocationCount.incrementAndGet();
+ if (invocationCount.get() == 5) {
+ mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
+ }
+
+ return onePartitionCluster;
+ });
+
+ KafkaProducer<String, String> producer = new KafkaProducer<String,
String>(configs, new StringSerializer(),
+ new StringSerializer(), metadata, new MockClient(Time.SYSTEM,
metadata), null, mockTime) {
+ @Override
+ Sender newSender(LogContext logContext, KafkaClient kafkaClient,
Metadata metadata) {
+ // give Sender its own Metadata instance so that we can
isolate Metadata calls from KafkaProducer
+ return super.newSender(logContext, kafkaClient, new
Metadata(0, 100_000, true));
+ }
+ };
+
+ // Four request updates where the requested partition is out of range,
at which point the timeout expires
+ // and a TimeoutException is thrown
+ Future future = producer.send(record);
+ verify(metadata, times(4)).requestUpdate();
+ verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+ verify(metadata, times(5)).fetch();
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ } finally {
+ producer.close(0, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Test
public void testTopicRefreshInMetadata() throws InterruptedException {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
diff --git
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 09a6188..9d454e9 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -21,19 +21,20 @@ import java.nio.charset.StandardCharsets
import java.util.Properties
import java.util.concurrent.TimeUnit
-import collection.JavaConverters._
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.junit.Assert._
import org.junit.{After, Before, Test}
+import scala.collection.JavaConverters._
import scala.collection.mutable.Buffer
import scala.concurrent.ExecutionException
@@ -71,13 +72,15 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
protected def createProducer(brokerList: String,
lingerMs: Int = 0,
batchSize: Int = 16384,
- compressionType: String = "none"):
KafkaProducer[Array[Byte],Array[Byte]] = {
+ compressionType: String = "none",
+ maxBlockMs: Long = 60 * 1000L):
KafkaProducer[Array[Byte],Array[Byte]] = {
val producer = TestUtils.createProducer(brokerList,
compressionType = compressionType,
securityProtocol = securityProtocol,
trustStoreFile = trustStoreFile,
saslProperties = clientSaslProperties,
- lingerMs = lingerMs)
+ lingerMs = lingerMs,
+ maxBlockMs = maxBlockMs)
registerProducer(producer)
}
@@ -344,7 +347,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
*/
@Test
def testSendBeforeAndAfterPartitionExpansion() {
- val producer = createProducer(brokerList)
+ val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L)
// create topic
createTopic(topic, 1, 2)
@@ -364,10 +367,13 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
// Trying to send a record to a partition beyond topic's partition range
before adding the partition should fail.
val partition1 = 1
try {
- producer.send(new ProducerRecord(topic, partition1, null,
"value".getBytes(StandardCharsets.UTF_8)))
+ producer.send(new ProducerRecord(topic, partition1, null,
"value".getBytes(StandardCharsets.UTF_8))).get()
fail("Should not allow sending a record to a partition not present in
the metadata")
} catch {
- case _: KafkaException => // this is ok
+ case e: ExecutionException => e.getCause match {
+ case _: TimeoutException => // this is ok
+ case ex => throw new Exception("Sending to a partition not present in
the metadata should result in a TimeoutException", ex)
+ }
}
val existingAssignment =
zkClient.getReplicaAssignmentForTopics(Set(topic)).map {
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index b7d3ecb..17d68d1 100644
---
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -17,7 +17,7 @@
package kafka.api
-import java.util.concurrent.{ExecutionException, TimeoutException}
+import java.util.concurrent.ExecutionException
import java.util.Properties
import kafka.integration.KafkaServerTestHarness
@@ -25,7 +25,6 @@ import kafka.log.LogConfig
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
@@ -182,8 +181,8 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
}
/**
- * Send with invalid partition id should throw KafkaException when
partition is higher than the upper bound of
- * partitions.
+ * Send with invalid partition id should return ExecutionException caused
by TimeoutException
+ * when partition is higher than the upper bound of partitions.
*/
@Test
def testInvalidPartition() {
@@ -192,8 +191,11 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
// create a record with incorrect partition id (higher than the number of
partitions), send should fail
val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes,
"value".getBytes)
- intercept[KafkaException] {
- producer1.send(higherRecord)
+ intercept[ExecutionException] {
+ producer1.send(higherRecord).get
+ }.getCause match {
+ case _: TimeoutException => // this is ok
+ case ex => throw new Exception("Sending to a partition not present in
the metadata should result in a TimeoutException", ex)
}
}